diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Buffer.cpp | 1 | ||||
| -rw-r--r-- | src/CicEqualizer.cpp | 12 | ||||
| -rw-r--r-- | src/DabMod.cpp | 476 | ||||
| -rw-r--r-- | src/DabModulator.cpp | 155 | ||||
| -rw-r--r-- | src/DabModulator.h | 13 | ||||
| -rw-r--r-- | src/EtiReader.cpp | 27 | ||||
| -rw-r--r-- | src/EtiReader.h | 13 | ||||
| -rw-r--r-- | src/Flowgraph.cpp | 82 | ||||
| -rw-r--r-- | src/Flowgraph.h | 32 | ||||
| -rw-r--r-- | src/FormatConverter.cpp | 103 | ||||
| -rw-r--r-- | src/FormatConverter.h | 53 | ||||
| -rw-r--r-- | src/FrameMultiplexer.cpp | 8 | ||||
| -rw-r--r-- | src/FrameMultiplexer.h | 9 | ||||
| -rw-r--r-- | src/InputFileReader.cpp | 18 | ||||
| -rw-r--r-- | src/InputReader.h | 25 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 33 | ||||
| -rw-r--r-- | src/Makefile.am | 126 | ||||
| -rw-r--r-- | src/OutputMemory.h | 2 | ||||
| -rw-r--r-- | src/OutputUHD.cpp | 165 | ||||
| -rw-r--r-- | src/OutputUHD.h | 44 | ||||
| -rw-r--r-- | src/OutputZeroMQ.cpp | 32 | ||||
| -rw-r--r-- | src/OutputZeroMQ.h | 9 | ||||
| -rw-r--r-- | src/RemoteControl.cpp | 146 | ||||
| -rw-r--r-- | src/RemoteControl.h | 127 | ||||
| -rw-r--r-- | src/TimestampDecoder.cpp | 4 | ||||
| -rw-r--r-- | src/TimestampDecoder.h | 11 | ||||
| -rw-r--r-- | src/Utils.cpp | 121 | ||||
| -rw-r--r-- | src/Utils.h | 44 | 
28 files changed, 1320 insertions, 571 deletions
| diff --git a/src/Buffer.cpp b/src/Buffer.cpp index aa0ef4c..fa7f52f 100644 --- a/src/Buffer.cpp +++ b/src/Buffer.cpp @@ -47,6 +47,7 @@ Buffer::Buffer(size_t len, const void *data)  Buffer::~Buffer()  { +    PDEBUG("Buffer::~Buffer() len=%zu, data=%p\n", len, data);      free(data);  } diff --git a/src/CicEqualizer.cpp b/src/CicEqualizer.cpp index d8eb2ee..a9c0dd6 100644 --- a/src/CicEqualizer.cpp +++ b/src/CicEqualizer.cpp @@ -46,11 +46,12 @@ CicEqualizer::CicEqualizer(size_t nbCarriers, size_t spacing, int R) :          float angle = pi * k / spacing;          if (k == 0) {              myFilter[i] = 1.0f; -	} else { -		myFilter[i] = sinf(angle / R) / sinf(angle * M); -		myFilter[i] = fabsf(myFilter[i]) * R * M; -		myFilter[i] = powf(myFilter[i], N); -	} +        } +        else { +            myFilter[i] = sinf(angle / R) / sinf(angle * M); +            myFilter[i] = fabsf(myFilter[i]) * R * M; +            myFilter[i] = powf(myFilter[i], N); +        }          PDEBUG("HCic[%zu -> %i] = %f (%f dB) -> angle: %f\n",                  i, k,myFilter[i], 20.0 * log10(myFilter[i]), angle);      } @@ -93,3 +94,4 @@ int CicEqualizer::process(Buffer* const dataIn, Buffer* dataOut)      return sizeOut;  } + diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 91c0b9d..ec1a4cd 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2014 +   Copyright (C) 2014, 2015     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -30,11 +30,12 @@  #endif  #include "porting.h" - +#include "Utils.h"  #include "Log.h"  #include "DabModulator.h"  #include "InputMemory.h"  #include "OutputFile.h" +#include "FormatConverter.h"  #if defined(HAVE_OUTPUT_UHD)  #   include "OutputUHD.h"  #endif @@ -45,6 +46,8 @@  #include "FIRFilter.h"  #include "RemoteControl.h" +#include <boost/shared_ptr.hpp> +#include <boost/make_shared.hpp>  #include <boost/property_tree/ptree.hpp>  #include <boost/property_tree/ini_parser.hpp>  #include <complex> @@ -67,125 +70,78 @@  #   define memalign(a, b)   malloc(b)  #endif +#define ZMQ_INPUT_MAX_FRAME_QUEUE 50 +  typedef std::complex<float> complexf; +using namespace boost; -bool running = true; +volatile sig_atomic_t running = 1;  void signalHandler(int signalNb)  {      PDEBUG("signalHandler(%i)\n", signalNb); -    running = false; +    running = 0;  } - -void printUsage(char* progName, FILE* out = stderr) +struct modulator_data  { -    fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", -            PACKAGE, -#if defined(GITVERSION) -            GITVERSION, -#else -            VERSION, -#endif -            __DATE__, __TIME__); -    fprintf(out, "Usage with configuration file:\n"); -    fprintf(out, "\t%s [-C] config_file.ini\n\n", progName); - -    fprintf(out, "Usage with command line options:\n"); -    fprintf(out, "\t%s" -            " input" -            " (-f filename | -u uhddevice -F frequency) " -            " [-G txgain]" -            " [-o offset]" -            " [-O offsetfile]" -            " [-T filter_taps_file]" -            " [-a gain]" -            " [-c clockrate]" -            " [-g gainMode]" -            " [-h]" -            " [-l]" -            " [-m dabMode]" -            " [-r samplingRate]" -            "\n", progName); -    fprintf(out, "Where:\n"); -    fprintf(out, "input:         ETI input filename (default: stdin).\n"); -    fprintf(out, "-f name:       Use file output with given filename. (use /dev/stdout for standard output)\n"); -    fprintf(out, "-u device:     Use UHD output with given device string. (use "" for default device)\n"); -    fprintf(out, "-F frequency:  Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n"); -    fprintf(out, "-G txgain:     Set the transmit gain for the UHD driver (default: 0)\n"); -    fprintf(out, "-o:            (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n"); -    fprintf(out, "-O:            (UHD only) Set the file containing the timestamp offset added to the timestamp in the ETI.\n" -                                 "The file is read every six seconds, and must contain a double value.\n"); -    fprintf(out, "                  Specifying either -o or -O has two implications: It enables synchronous transmission,\n" -                 "                  requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n" -                 "                  get muted.\n\n"); -    fprintf(out, "-T taps_file:  Enable filtering before the output, using the specified file containing the filter taps.\n"); -    fprintf(out, "-a gain:       Apply digital amplitude gain.\n"); -    fprintf(out, "-c rate:       Set the DAC clock rate and enable Cic Equalisation.\n"); -    fprintf(out, "-g:            Set computation gain mode: " -            "%u FIX, %u MAX, %u VAR\n", GAIN_FIX, GAIN_MAX, GAIN_VAR); -    fprintf(out, "-h:            Print this help.\n"); -    fprintf(out, "-l:            Loop file when reach end of file.\n"); -    fprintf(out, "-m mode:       Set DAB mode: (0: auto, 1-4: force).\n"); -    fprintf(out, "-r rate:       Set output sampling rate (default: 2048000).\n"); -} +    modulator_data() : +        inputReader(NULL), +        framecount(0), +        flowgraph(NULL), +        rcs(NULL) {} +    InputReader* inputReader; +    Buffer data; +    uint64_t framecount; -void printVersion(FILE *out = stderr) -{ -    fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", -            PACKAGE, VERSION, __DATE__, __TIME__); -    fprintf(out, -            "    ODR-DabMod is copyright (C) Her Majesty the Queen in Right of Canada,\n" -            "    2009, 2010, 2011, 2012 Communications Research Centre (CRC),\n" -            "     and\n" -            "    Copyright (C) 2014 Matthias P. Braendli, matthias.braendli@mpb.li\n" -            "\n" -            "    http://opendigitalradio.org\n" -            "\n" -            "    This program is available free of charge and is licensed to you on a\n" -            "    non-exclusive basis; you may not redistribute it.\n" -            "\n" -            "    This program is provided \"AS IS\" in the hope that it will be useful, but\n" -            "    WITHOUT ANY WARRANTY with respect to its accurancy or usefulness; witout\n" -            "    even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR\n" -            "    PURPOSE and NONINFRINGEMENT.\n" -            "\n" -            "    In no event shall CRC be LIABLE for any LOSS, DAMAGE or COST that may be\n" -            "    incurred in connection with the use of this software.\n" -            "\n" -#if USE_KISS_FFT -            "ODR-DabMod makes use of the following open source packages:\n" -            "    Kiss FFT v1.2.9 (Revised BSD) - http://kissfft.sourceforge.net/\n" -#endif -           ); +    Flowgraph* flowgraph; +    RemoteControllers* rcs; +}; -} +enum run_modulator_state { +    MOD_FAILURE, +    MOD_NORMAL_END, +    MOD_AGAIN +}; +run_modulator_state run_modulator(Logger& logger, modulator_data& m); -int main(int argc, char* argv[]) +int launch_modulator(int argc, char* argv[])  {      int ret = 0;      bool loop = false;      std::string inputName = "";      std::string inputTransport = "file"; +    unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE;      std::string outputName;      int useZeroMQOutput = 0; +    std::string zmqOutputSocketType = "";      int useFileOutput = 0; +    std::string fileOutputFormat = "complexf";      int useUHDOutput = 0; -    uint64_t frame = 0;      size_t outputRate = 2048000;      size_t clockRate = 0;      unsigned dabMode = 0;      float digitalgain = 1.0f;      float normalise = 1.0f;      GainMode gainMode = GAIN_VAR; -    Buffer data; + + +    /* UHD requires the input I and Q samples to be in the interval +     * [-1.0,1.0], otherwise they get truncated, which creates very +     * wide-spectrum spikes. Depending on the Transmission Mode, the +     * Gain Mode and the sample rate (and maybe other parameters), the +     * samples can have peaks up to about 48000. The value of 50000 +     * should guarantee that with a digital gain of 1.0, UHD never clips +     * our samples. +     */ +    const float normalise_factor = 50000.0f;      std::string filterTapsFilename = ""; @@ -198,27 +154,37 @@ int main(int argc, char* argv[])      OutputUHDConfig outputuhd_conf;  #endif +    modulator_data m; +      // To handle the timestamp offset of the modulator      struct modulator_offset_config modconf;      modconf.use_offset_file = false;      modconf.use_offset_fixed = false;      modconf.delay_calculation_pipeline_stages = 0; -    Flowgraph* flowgraph = NULL; -    DabModulator* modulator = NULL; -    InputMemory* input = NULL; -    ModOutput* output = NULL; +    shared_ptr<Flowgraph> flowgraph(new Flowgraph()); +    shared_ptr<FormatConverter> format_converter; +    shared_ptr<ModOutput> output; + +    RemoteControllers rcs; +    m.rcs = &rcs; -    BaseRemoteController* rc = NULL; +    bool run_again = true;      Logger logger;      InputFileReader inputFileReader(logger); -#if defined(HAVE_INPUT_ZEROMQ) -    InputZeroMQReader inputZeroMQReader(logger); +#if defined(HAVE_ZEROMQ) +    shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader(logger));  #endif -    InputReader* inputReader; -    signal(SIGINT, signalHandler); +    struct sigaction sa; +    memset(&sa, 0, sizeof(struct sigaction)); +    sa.sa_handler = &signalHandler; + +    if (sigaction(SIGINT, &sa, NULL) == -1) { +        perror("sigaction"); +        return EXIT_FAILURE; +    }      // Set timezone to UTC      setenv("TZ", "", 1); @@ -250,7 +216,7 @@ int main(int argc, char* argv[])  #if defined(HAVE_OUTPUT_UHD)              if (useUHDOutput) {                  fprintf(stderr, "Options -u and -f are mutually exclusive\n"); -                goto END_MAIN; +                throw std::invalid_argument("Invalid command line options");              }  #endif              outputName = optarg; @@ -276,7 +242,7 @@ int main(int argc, char* argv[])              if (modconf.use_offset_file)              {                  fprintf(stderr, "Options -o and -O are mutually exclusive\n"); -                goto END_MAIN; +                throw std::invalid_argument("Invalid command line options");              }              modconf.use_offset_fixed = true;              modconf.offset_fixed = strtod(optarg, NULL); @@ -288,7 +254,7 @@ int main(int argc, char* argv[])              if (modconf.use_offset_fixed)              {                  fprintf(stderr, "Options -o and -O are mutually exclusive\n"); -                goto END_MAIN; +                throw std::invalid_argument("Invalid command line options");              }              modconf.use_offset_file = true;              modconf.offset_filename = std::string(optarg); @@ -309,7 +275,7 @@ int main(int argc, char* argv[])  #if defined(HAVE_OUTPUT_UHD)              if (useFileOutput) {                  fprintf(stderr, "Options -u and -f are mutually exclusive\n"); -                goto END_MAIN; +                throw std::invalid_argument("Invalid command line options");              }              outputuhd_conf.device = optarg;              useUHDOutput = 1; @@ -317,17 +283,17 @@ int main(int argc, char* argv[])              break;          case 'V':              printVersion(); -            goto END_MAIN; +            throw std::invalid_argument("");              break;          case '?':          case 'h':              printUsage(argv[0]); -            goto END_MAIN; +            throw std::invalid_argument("");              break;          default:              fprintf(stderr, "Option '%c' not coded yet!\n", c);              ret = -1; -            goto END_MAIN; +            throw std::invalid_argument("Invalid command line options");          }      } @@ -352,15 +318,12 @@ int main(int argc, char* argv[])          "\n";      std::cerr << "Compiled with features: " << -#if defined(HAVE_INPUT_ZEROMQ) -        "input_zeromq " << +#if defined(HAVE_ZEROMQ) +        "zeromq " <<  #endif  #if defined(HAVE_OUTPUT_UHD)          "output_uhd " <<  #endif -#if defined(HAVE_OUTPUT_ZEROMQ) -        "output_zeromq " << -#endif          "\n";      if (use_configuration_file && use_configuration_cmdline) { @@ -371,7 +334,7 @@ int main(int argc, char* argv[])      // No argument given ? You can't be serious ! Show usage.      if (argc == 1) {          printUsage(argv[0]); -        goto END_MAIN; +        throw std::invalid_argument("Invalid command line options");      }      // If only one argument is given, interpret as configuration file name @@ -390,8 +353,9 @@ int main(int argc, char* argv[])          }          catch (boost::property_tree::ini_parser::ini_parser_error &e)          { -            fprintf(stderr, "Error, cannot read configuration file '%s'\n", configuration_file.c_str()); -            goto END_MAIN; +            std::cerr << "Error, cannot read configuration file '" << configuration_file.c_str() << "'" << std::endl; +            std::cerr << "       " << e.what() << std::endl; +            throw std::runtime_error("Cannot read configuration file");          }          // remote controller: @@ -399,14 +363,30 @@ int main(int argc, char* argv[])              try {                  int telnetport = pt.get<int>("remotecontrol.telnetport");                  RemoteControllerTelnet* telnetrc = new RemoteControllerTelnet(telnetport); -                rc = telnetrc; +                rcs.add_controller(telnetrc);              }              catch (std::exception &e) {                  std::cerr << "Error: " << e.what() << "\n";                  std::cerr << "       telnet remote control enabled, but no telnetport defined.\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error"); +            } +        } + +#if defined(HAVE_ZEROMQ) +        if (pt.get("remotecontrol.zmqctrl", 0) == 1) { +            try { +                std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); +                std::cerr << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; +                RemoteControllerZmq* zmqrc = new RemoteControllerZmq(zmqCtrlEndpoint); +                rcs.add_controller(zmqrc); +            } +            catch (std::exception &e) { +                std::cerr << "Error: " << e.what() << "\n"; +                std::cerr << "       zmq remote control enabled, but no endpoint defined.\n"; +                throw std::runtime_error("Configuration error");              }          } +#endif          // input params:          if (pt.get("input.loop", 0) == 1) { @@ -414,6 +394,9 @@ int main(int argc, char* argv[])          }          inputTransport = pt.get("input.transport", "file"); +        inputMaxFramesQueued = pt.get("input.max_frames_queued", +                ZMQ_INPUT_MAX_FRAME_QUEUE); +          inputName = pt.get("input.source", "/dev/stdin");          // log parameters: @@ -430,7 +413,7 @@ int main(int argc, char* argv[])              catch (std::exception &e) {                  std::cerr << "Error: " << e.what() << "\n";                  std::cerr << "       Configuration enables file log, but does not specify log filename\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              }              LogToFile* log_file = new LogToFile(logfilename); @@ -453,7 +436,7 @@ int main(int argc, char* argv[])              catch (std::exception &e) {                  std::cerr << "Error: " << e.what() << "\n";                  std::cerr << "       Configuration enables firfilter, but does not specify filter taps file\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              }          } @@ -465,7 +448,7 @@ int main(int argc, char* argv[])          catch (std::exception &e) {              std::cerr << "Error: " << e.what() << "\n";              std::cerr << "       Configuration does not specify output\n"; -            goto END_MAIN; +            throw std::runtime_error("Configuration error");          }          if (output_selected == "file") { @@ -475,9 +458,11 @@ int main(int argc, char* argv[])              catch (std::exception &e) {                  std::cerr << "Error: " << e.what() << "\n";                  std::cerr << "       Configuration does not specify file name for file output\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              }              useFileOutput = 1; + +            fileOutputFormat = pt.get("fileoutput.format", fileOutputFormat);          }  #if defined(HAVE_OUTPUT_UHD)          else if (output_selected == "uhd") { @@ -499,10 +484,11 @@ int main(int argc, char* argv[])              outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0);              outputuhd_conf.frequency = pt.get<double>("uhdoutput.frequency", 0);              std::string chan = pt.get<std::string>("uhdoutput.channel", ""); +            outputuhd_conf.dabMode = dabMode;              if (outputuhd_conf.frequency == 0 && chan == "") {                  std::cerr << "       UHD output enabled, but neither frequency nor channel defined.\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              }              else if (outputuhd_conf.frequency == 0) {                  double freq; @@ -546,13 +532,13 @@ int main(int argc, char* argv[])                  else if (chan == "13F") freq = 239200000;                  else {                      std::cerr << "       UHD output: channel " << chan << " does not exist in table\n"; -                    goto END_MAIN; +                    throw std::out_of_range("UHD channel selection error");                  }                  outputuhd_conf.frequency = freq;              }              else if (outputuhd_conf.frequency != 0 && chan != "") {                  std::cerr << "       UHD output: cannot define both frequency and channel.\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              } @@ -570,21 +556,22 @@ int main(int argc, char* argv[])              }              else {                  std::cerr << "Error: UHD output: behaviour_refclk_lock_lost invalid." << std::endl; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              }              useUHDOutput = 1;          }  #endif -#if defined(HAVE_OUTPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ)          else if (output_selected == "zmq") {              outputName = pt.get<std::string>("zmqoutput.listen"); +            zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type");              useZeroMQOutput = 1;          }  #endif          else {              std::cerr << "Error: Invalid output defined.\n"; -            goto END_MAIN; +            throw std::runtime_error("Configuration error");          }  #if defined(HAVE_OUTPUT_UHD) @@ -607,7 +594,7 @@ int main(int argc, char* argv[])              catch (std::exception &e) {                  std::cerr << "Error: " << e.what() << "\n";                  std::cerr << "       Synchronised transmission enabled, but delay management specification is incomplete.\n"; -                goto END_MAIN; +                throw std::runtime_error("Configuration error");              }          } @@ -615,9 +602,9 @@ int main(int argc, char* argv[])  #endif      } -    if (!rc) { +    if (rcs.get_no_controllers() == 0) {          logger.level(warn) << "No Remote-Control started"; -        rc = new RemoteControllerDummy(); +        rcs.add_controller(new RemoteControllerDummy());      } @@ -661,13 +648,13 @@ int main(int argc, char* argv[])          printUsage(argv[0]);          ret = -1;          logger.level(error) << "Received invalid command line arguments"; -        goto END_MAIN; +        throw std::invalid_argument("Invalid command line options");      }      if (!useFileOutput && !useUHDOutput && !useZeroMQOutput) {          logger.level(error) << "Output not specified";          fprintf(stderr, "Must specify output !"); -        goto END_MAIN; +        throw std::runtime_error("Configuration error");      }      // Print settings @@ -692,8 +679,10 @@ int main(int argc, char* argv[])  #endif      else if (useZeroMQOutput) {          fprintf(stderr, " ZeroMQ\n" -                        "  Listening on: %s\n", -                        outputName.c_str()); +                        "  Listening on: %s\n" +                        "  Socket type : %s\n", +                        outputName.c_str(), +                        zmqOutputSocketType.c_str());      }      fprintf(stderr, "  Sampling rate: "); @@ -713,88 +702,146 @@ int main(int argc, char* argv[])              fprintf(stderr, "Unable to open input file!\n");              logger.level(error) << "Unable to open input file!";              ret = -1; -            goto END_MAIN; +            throw std::runtime_error("Unable to open input");          } -        inputReader = &inputFileReader; +        m.inputReader = &inputFileReader;      }      else if (inputTransport == "zeromq") { -#if !defined(HAVE_INPUT_ZEROMQ) +#if !defined(HAVE_ZEROMQ)          fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n");          ret = -1; -        goto END_MAIN; +        throw std::runtime_error("Unable to open input");  #else -        // The URL might start with zmq+tcp:// -        if (inputName.substr(0, 4) == "zmq+") { -            inputZeroMQReader.Open(inputName.substr(4)); -        } -        else { -            inputZeroMQReader.Open(inputName); -        } -        inputReader = &inputZeroMQReader; +        inputZeroMQReader->Open(inputName, inputMaxFramesQueued); +        m.inputReader = inputZeroMQReader.get();  #endif      }      else      {          fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str());          ret = -1; -        goto END_MAIN; +        throw std::runtime_error("Unable to open input");      } -      if (useFileOutput) { -        // Opening COFDM output file -        output = new OutputFile(outputName); +        if (fileOutputFormat == "complexf") { +            output = make_shared<OutputFile>(outputName); +        } +        else if (fileOutputFormat == "s8") { +            // We must normalise the samples to the interval [-127.0; 127.0] +            normalise = 127.0f / normalise_factor; + +            format_converter = make_shared<FormatConverter>(); + +            output = make_shared<OutputFile>(outputName); +        }      }  #if defined(HAVE_OUTPUT_UHD)      else if (useUHDOutput) { - -        /* UHD requires the input I and Q samples to be in the interval -         * [-1.0,1.0], otherwise they get truncated, which creates very -         * wide-spectrum spikes. Depending on the Transmission Mode, the -         * Gain Mode and the sample rate (and maybe other parameters), the -         * samples can have peaks up to about 48000. The value of 50000 -         * should guarantee that with a digital gain of 1.0, UHD never clips -         * our samples. -         */ -        normalise = 1.0f/50000.0f; - +        normalise = 1.0f / normalise_factor;          outputuhd_conf.sampleRate = outputRate; -        try { -            output = new OutputUHD(outputuhd_conf, logger); -            ((OutputUHD*)output)->enrol_at(*rc); -        } -        catch (std::exception& e) { -            logger.level(error) << "UHD initialisation failed:" << e.what(); -            goto END_MAIN; -        } +        output = make_shared<OutputUHD>(outputuhd_conf, &logger); +        ((OutputUHD*)output.get())->enrol_at(rcs);      }  #endif -#if defined(HAVE_OUTPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ)      else if (useZeroMQOutput) {          /* We normalise the same way as for the UHD output */ -        normalise = 1.0f/50000.0f; - -        output = new OutputZeroMQ(outputName); +        normalise = 1.0f / normalise_factor; +        if (zmqOutputSocketType == "pub") { +            output = make_shared<OutputZeroMQ>(outputName, ZMQ_PUB); +        } +        else if (zmqOutputSocketType == "rep") { +            output = make_shared<OutputZeroMQ>(outputName, ZMQ_REP); +        } +        else { +            std::stringstream ss; +            ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid"; +            throw std::invalid_argument(ss.str()); +        }      }  #endif -    flowgraph = new Flowgraph(); -    data.setLength(6144); -    input = new InputMemory(&data); -    modulator = new DabModulator(modconf, rc, logger, outputRate, clockRate, -            dabMode, gainMode, digitalgain, normalise, filterTapsFilename); -    flowgraph->connect(input, modulator); -    flowgraph->connect(modulator, output); + +    while (run_again) { +        Flowgraph flowgraph; + +        m.flowgraph = &flowgraph; +        m.data.setLength(6144); + +        shared_ptr<InputMemory> input(new InputMemory(&m.data)); +        shared_ptr<DabModulator> modulator( +                new DabModulator(modconf, &rcs, logger, outputRate, clockRate, +                    dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); + +        flowgraph.connect(input, modulator); +        if (format_converter) { +            flowgraph.connect(modulator, format_converter); +            flowgraph.connect(format_converter, output); +        } +        else { +            flowgraph.connect(modulator, output); +        }  #if defined(HAVE_OUTPUT_UHD) -    if (useUHDOutput) { -        ((OutputUHD*)output)->setETIReader(modulator->getEtiReader()); -    } +        if (useUHDOutput) { +            ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); +        } +#endif + +        m.inputReader->PrintInfo(); + +        run_modulator_state st = run_modulator(logger, m); + +        switch (st) { +            case MOD_FAILURE: +                fprintf(stderr, "\nModulator failure.\n"); +                run_again = false; +                ret = 1; +                break; +#if defined(HAVE_ZEROMQ) +            case MOD_AGAIN: +                fprintf(stderr, "\nRestart modulator\n"); +                running = true; +                if (inputTransport == "zeromq") { +                    run_again = true; + +                    // Create a new input reader +                    inputZeroMQReader = make_shared<InputZeroMQReader>(logger); +                    inputZeroMQReader->Open(inputName, inputMaxFramesQueued); +                    m.inputReader = inputZeroMQReader.get(); +                } +                break;  #endif +            case MOD_NORMAL_END: +            default: +                fprintf(stderr, "\nModulator stopped.\n"); +                ret = 0; +                run_again = false; +                break; +        } + +        fprintf(stderr, "\n\n"); +        fprintf(stderr, "%lu DAB frames encoded\n", m.framecount); +        fprintf(stderr, "%f seconds encoded\n", (float)m.framecount * 0.024f); + +        fprintf(stderr, "\nCleaning flowgraph...\n"); + +        m.data.setLength(0); +    } -    inputReader->PrintInfo(); +    //////////////////////////////////////////////////////////////////////// +    // Cleaning things +    //////////////////////////////////////////////////////////////////////// +    logger.level(info) << "Terminating"; +    return ret; +} + +run_modulator_state run_modulator(Logger& logger, modulator_data& m) +{ +    run_modulator_state ret = MOD_FAILURE;      try {          while (running) { @@ -803,59 +850,68 @@ int main(int argc, char* argv[])              PDEBUG("*****************************************\n");              PDEBUG("* Starting main loop\n");              PDEBUG("*****************************************\n"); -            while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { +            while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {                  if (!running) {                      break;                  } -                frame++; +                m.framecount++;                  PDEBUG("*****************************************\n"); -                PDEBUG("* Read frame %lu\n", frame); +                PDEBUG("* Read frame %lu\n", m.framecount);                  PDEBUG("*****************************************\n");                  //////////////////////////////////////////////////////////////// -                // Proccessing data +                // Processing data                  //////////////////////////////////////////////////////////////// -                flowgraph->run(); +                m.flowgraph->run();                  /* Check every once in a while if the remote control                   * is still working */ -                if (rc && (frame % 250) == 0 && rc->fault_detected()) { -                    fprintf(stderr, -                            "Detected Remote Control fault, restarting it\n"); -                    rc->restart(); +                if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) { +                    m.rcs->check_faults();                  }              }              if (framesize == 0) { -                fprintf(stderr, "End of file reached.\n"); +                logger.level(info) << "End of file reached.";              }              else { -                fprintf(stderr, "Input read error.\n"); +                logger.level(error) << "Input read error.";              } -            running = false; +            running = 0; +            ret = MOD_NORMAL_END;          } +#if defined(HAVE_OUTPUT_UHD) +    } catch (fct_discontinuity_error& e) { +        // The OutputUHD saw a FCT discontinuity +        logger.level(warn) << e.what(); +        ret = MOD_AGAIN; +#endif +    } catch (zmq_input_overflow& e) { +        // The ZeroMQ input has overflowed its buffer +        logger.level(warn) << e.what(); +        ret = MOD_AGAIN;      } catch (std::exception& e) { -        fprintf(stderr, "EXCEPTION: %s\n", e.what()); -        ret = -1; +        logger.level(error) << "Exception caught: " << e.what(); +        ret = MOD_FAILURE;      } -END_MAIN: -    //////////////////////////////////////////////////////////////////////// -    // Cleaning things -    //////////////////////////////////////////////////////////////////////// -    fprintf(stderr, "\n\n"); -    fprintf(stderr, "%lu DAB frames encoded\n", frame); -    fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f); - -    fprintf(stderr, "\nCleaning flowgraph...\n"); -    delete flowgraph; - -    // Cif -    fprintf(stderr, "\nCleaning buffers...\n"); - -    logger.level(info) << "Terminating"; -      return ret;  } +int main(int argc, char* argv[]) +{ +    try { +        return launch_modulator(argc, argv); +    } +    catch (std::invalid_argument& e) { +        std::string what(e.what()); +        if (not what.empty()) { +            std::cerr << "Modulator error: " << what << std::endl; +        } +    } +    catch (std::runtime_error& e) { +        std::cerr << "Modulator runtime error: " << e.what() << std::endl; +    } +} + diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 7f246d8..667d885 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -3,8 +3,10 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Includes modifications for which no copyright is claimed -   2012, Matthias P. Braendli, matthias.braendli@mpb.li +   Copyright (C) 2015 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -50,10 +52,11 @@  #include "RemoteControl.h"  #include "Log.h" +using namespace boost;  DabModulator::DabModulator(          struct modulator_offset_config& modconf, -        BaseRemoteController* rc, +        RemoteControllers* rcs,          Logger& logger,          unsigned outputRate, unsigned clockRate,          unsigned dabMode, GainMode gainMode, @@ -71,7 +74,7 @@ DabModulator::DabModulator(      myEtiReader(EtiReader(modconf, myLogger)),      myFlowgraph(NULL),      myFilterTapsFilename(filterTapsFilename), -    myRC(rc) +    myRCs(rcs)  {      PDEBUG("DabModulator::DabModulator(%u, %u, %u, %u) @ %p\n",              outputRate, clockRate, dabMode, gainMode, this); @@ -155,62 +158,65 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)          ////////////////////////////////////////////////////////////////          // CIF data initialisation          //////////////////////////////////////////////////////////////// -        FrameMultiplexer* cifMux = NULL; -        PrbsGenerator* cifPrbs = NULL; -        BlockPartitioner* cifPart = NULL; -        QpskSymbolMapper* cifMap = NULL; -        FrequencyInterleaver* cifFreq = NULL; -        PhaseReference* cifRef = NULL; -        DifferentialModulator* cifDiff = NULL; -        NullSymbol* cifNull = NULL; -        SignalMultiplexer* cifSig = NULL; -        CicEqualizer* cifCicEq = NULL; -        OfdmGenerator* cifOfdm = NULL; -        GainControl* cifGain = NULL; -        GuardIntervalInserter* cifGuard = NULL; -        FIRFilter* cifFilter = NULL; -        Resampler* cifRes = NULL; - -        cifPrbs = new PrbsGenerator(864 * 8, 0x110); -        cifMux = new FrameMultiplexer(myFicSizeOut + 864 * 8, -                &myEtiReader.getSubchannels()); -        cifPart = new BlockPartitioner(mode, myEtiReader.getFp()); -        cifMap = new QpskSymbolMapper(myNbCarriers); -        cifRef = new PhaseReference(mode); -        cifFreq = new FrequencyInterleaver(mode); -        cifDiff = new DifferentialModulator(myNbCarriers); -        cifNull = new NullSymbol(myNbCarriers); -        cifSig = new SignalMultiplexer( -                (1 + myNbSymbols) * myNbCarriers * sizeof(complexf)); - +        shared_ptr<PrbsGenerator> cifPrbs(new PrbsGenerator(864 * 8, 0x110)); +        shared_ptr<FrameMultiplexer> cifMux( +                new FrameMultiplexer(myFicSizeOut + 864 * 8, +                &myEtiReader.getSubchannels())); + +        shared_ptr<BlockPartitioner> cifPart( +                new BlockPartitioner(mode, myEtiReader.getFp())); + +        shared_ptr<QpskSymbolMapper> cifMap(new QpskSymbolMapper(myNbCarriers)); +        shared_ptr<PhaseReference> cifRef(new PhaseReference(mode)); +        shared_ptr<FrequencyInterleaver> cifFreq(new FrequencyInterleaver(mode)); +        shared_ptr<DifferentialModulator> cifDiff( +                new DifferentialModulator(myNbCarriers)); + +        shared_ptr<NullSymbol> cifNull(new NullSymbol(myNbCarriers)); +        shared_ptr<SignalMultiplexer> cifSig(new SignalMultiplexer( +                (1 + myNbSymbols) * myNbCarriers * sizeof(complexf))); + +        // TODO this needs a review +        bool useCicEq = false; +        unsigned cic_ratio = 1;          if (myClockRate) { -            unsigned ratio = myClockRate / myOutputRate; -            ratio /= 4; // FPGA DUC +            cic_ratio = myClockRate / myOutputRate; +            cic_ratio /= 4; // FPGA DUC              if (myClockRate == 400000000) { // USRP2 -                if (ratio & 1) { // odd -                    cifCicEq = new CicEqualizer(myNbCarriers, -                            (float)mySpacing * (float)myOutputRate / 2048000.0f, -                            ratio); +                if (cic_ratio & 1) { // odd +                    useCicEq = true;                  } // even, no filter -            } else { -                cifCicEq = new CicEqualizer(myNbCarriers, -                        (float)mySpacing * (float)myOutputRate / 2048000.0f, -                        ratio); +            } +            else { +                useCicEq = true;              }          } -        cifOfdm = new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing); -        cifGain = new GainControl(mySpacing, myGainMode, myDigGain, myNormalise); -        cifGain->enrol_at(*myRC); +        shared_ptr<CicEqualizer> cifCicEq(new CicEqualizer(myNbCarriers, +                (float)mySpacing * (float)myOutputRate / 2048000.0f, +                cic_ratio)); + + +        shared_ptr<OfdmGenerator> cifOfdm( +                new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing)); -        cifGuard = new GuardIntervalInserter(myNbSymbols, mySpacing, -                myNullSize, mySymSize); +        shared_ptr<GainControl> cifGain( +                new GainControl(mySpacing, myGainMode, myDigGain, myNormalise)); + +        cifGain->enrol_at(*myRCs); + +        shared_ptr<GuardIntervalInserter> cifGuard( +                new GuardIntervalInserter(myNbSymbols, mySpacing, +                myNullSize, mySymSize)); + +        FIRFilter* cifFilter = NULL;          if (myFilterTapsFilename != "") {              cifFilter = new FIRFilter(myFilterTapsFilename); -            cifFilter->enrol_at(*myRC); +            cifFilter->enrol_at(*myRCs);          } -        myOutput = new OutputMemory(); +        shared_ptr<OutputMemory> myOutput(new OutputMemory(dataOut)); +        Resampler* cifRes = NULL;          if (myOutputRate != 2048000) {              cifRes = new Resampler(2048000, myOutputRate, mySpacing);          } else { @@ -222,10 +228,7 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)          ////////////////////////////////////////////////////////////////          // Processing FIC          //////////////////////////////////////////////////////////////// -        FicSource* fic = myEtiReader.getFic(); -        PrbsGenerator* ficPrbs = NULL; -        ConvEncoder* ficConv = NULL; -        PuncturingEncoder* ficPunc = NULL; +        shared_ptr<FicSource> fic(myEtiReader.getFic());          ////////////////////////////////////////////////////////////////          // Data initialisation          //////////////////////////////////////////////////////////////// @@ -241,13 +244,13 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)          PDEBUG(" Framesize: %zu\n", fic->getFramesize());          // Configuring prbs generator -        ficPrbs = new PrbsGenerator(myFicSizeIn, 0x110); +        shared_ptr<PrbsGenerator> ficPrbs(new PrbsGenerator(myFicSizeIn, 0x110));          // Configuring convolutionnal encoder -        ficConv = new ConvEncoder(myFicSizeIn); +        shared_ptr<ConvEncoder> ficConv(new ConvEncoder(myFicSizeIn));          // Configuring puncturing encoder -        ficPunc = new PuncturingEncoder(); +        shared_ptr<PuncturingEncoder> ficPunc(new PuncturingEncoder());          std::vector<PuncturingRule*> rules = fic->get_rules();          std::vector<PuncturingRule*>::const_iterator rule;          for (rule = rules.begin(); rule != rules.end(); ++rule) { @@ -267,16 +270,12 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)          ////////////////////////////////////////////////////////////////          // Configuring subchannels          //////////////////////////////////////////////////////////////// -        std::vector<SubchannelSource*> subchannels = +        std::vector<shared_ptr<SubchannelSource> > subchannels =              myEtiReader.getSubchannels(); -        std::vector<SubchannelSource*>::const_iterator subchannel; +        std::vector<shared_ptr<SubchannelSource> >::const_iterator subchannel;          for (subchannel = subchannels.begin();                  subchannel != subchannels.end();                  ++subchannel) { -            PrbsGenerator* subchPrbs = NULL; -            ConvEncoder* subchConv = NULL; -            PuncturingEncoder* subchPunc = NULL; -            TimeInterleaver* subchInterleaver = NULL;              ////////////////////////////////////////////////////////////              // Data initialisation @@ -307,13 +306,17 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)                      (*subchannel)->protectionOption());              // Configuring prbs genrerator -            subchPrbs = new PrbsGenerator(subchSizeIn, 0x110); +            shared_ptr<PrbsGenerator> subchPrbs( +                    new PrbsGenerator(subchSizeIn, 0x110));              // Configuring convolutionnal encoder -            subchConv = new ConvEncoder(subchSizeIn); +            shared_ptr<ConvEncoder> subchConv( +                    new ConvEncoder(subchSizeIn));              // Configuring puncturing encoder -            subchPunc = new PuncturingEncoder(); +            shared_ptr<PuncturingEncoder> subchPunc( +                    new PuncturingEncoder()); +              std::vector<PuncturingRule*> rules = (*subchannel)->get_rules();              std::vector<PuncturingRule*>::const_iterator rule;              for (rule = rules.begin(); rule != rules.end(); ++rule) { @@ -326,7 +329,8 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)              subchPunc->append_tail_rule(PuncturingRule(3, 0xcccccc));              // Configuring time interleaver -            subchInterleaver = new TimeInterleaver(subchSizeOut); +            shared_ptr<TimeInterleaver> subchInterleaver( +                    new TimeInterleaver(subchSizeOut));              myFlowgraph->connect(*subchannel, subchPrbs);              myFlowgraph->connect(subchPrbs, subchConv); @@ -342,7 +346,7 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)          myFlowgraph->connect(cifFreq, cifDiff);          myFlowgraph->connect(cifNull, cifSig);          myFlowgraph->connect(cifDiff, cifSig); -        if (myClockRate) { +        if (useCicEq) {              myFlowgraph->connect(cifSig, cifCicEq);              myFlowgraph->connect(cifCicEq, cifOfdm);          } else { @@ -352,18 +356,21 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)          myFlowgraph->connect(cifGain, cifGuard);          if (myFilterTapsFilename != "") { -            myFlowgraph->connect(cifGuard, cifFilter); +            shared_ptr<FIRFilter> cifFilterptr(cifFilter); +            myFlowgraph->connect(cifGuard, cifFilterptr);              if (cifRes != NULL) { -                myFlowgraph->connect(cifFilter, cifRes); -                myFlowgraph->connect(cifRes, myOutput); +                shared_ptr<Resampler> res(cifRes); +                myFlowgraph->connect(cifFilterptr, res); +                myFlowgraph->connect(res, myOutput);              } else { -                myFlowgraph->connect(cifFilter, myOutput); +                myFlowgraph->connect(cifFilterptr, myOutput);              }          }          else { //no filtering              if (cifRes != NULL) { -                myFlowgraph->connect(cifGuard, cifRes); -                myFlowgraph->connect(cifRes, myOutput); +                shared_ptr<Resampler> res(cifRes); +                myFlowgraph->connect(cifGuard, res); +                myFlowgraph->connect(res, myOutput);              } else {                  myFlowgraph->connect(cifGuard, myOutput);              } @@ -374,6 +381,6 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut)      ////////////////////////////////////////////////////////////////////      // Proccessing data      //////////////////////////////////////////////////////////////////// -    myOutput->setOutput(dataOut);      return myFlowgraph->run();  } + diff --git a/src/DabModulator.h b/src/DabModulator.h index 21f9f61..89ddd7c 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -3,8 +3,10 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Includes modifications for which no copyright is claimed -   2012, Matthias P. Braendli, matthias.braendli@mpb.li +   Copyright (C) 2015 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -32,6 +34,7 @@  #include <sys/types.h>  #include <string> +#include <boost/shared_ptr.hpp>  #include "ModCodec.h"  #include "EtiReader.h" @@ -47,7 +50,7 @@ class DabModulator : public ModCodec  public:      DabModulator(              struct modulator_offset_config& modconf, -            BaseRemoteController* rc, +            RemoteControllers* rcs,              Logger& logger,              unsigned outputRate = 2048000, unsigned clockRate = 0,              unsigned dabMode = 0, GainMode gainMode = GAIN_VAR, @@ -77,7 +80,7 @@ protected:      Flowgraph* myFlowgraph;      OutputMemory* myOutput;      std::string myFilterTapsFilename; -    BaseRemoteController* myRC; +    RemoteControllers* myRCs;      size_t myNbSymbols;      size_t myNbCarriers; @@ -88,5 +91,5 @@ protected:      size_t myFicSizeIn;  }; -  #endif // DAB_MODULATOR_H + diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index fe54f55..0e4182d 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2014 +   Copyright (C) 2014, 2015     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -34,6 +34,7 @@  #include <string.h>  #include <arpa/inet.h> +using namespace boost;  enum ETI_READER_STATE {      EtiReaderStateNbFrame, @@ -60,6 +61,7 @@ EtiReader::EtiReader(struct modulator_offset_config& modconf,      PDEBUG("EtiReader::EtiReader()\n");      myCurrentFrame = 0; +    eti_fc_valid = false;  }  EtiReader::~EtiReader() @@ -69,9 +71,6 @@ EtiReader::~EtiReader()  //    if (myFicSource != NULL) {  //        delete myFicSource;  //    } -//    for (unsigned i = 0; i < mySources.size(); ++i) { -//        delete mySources[i]; -//    }  } @@ -83,23 +82,29 @@ FicSource* EtiReader::getFic()  unsigned EtiReader::getMode()  { +    if (not eti_fc_valid) { +        throw std::runtime_error("Trying to access Mode before it is ready!"); +    }      return eti_fc.MID;  }  unsigned EtiReader::getFp()  { +    if (not eti_fc_valid) { +        throw std::runtime_error("Trying to access FP before it is ready!"); +    }      return eti_fc.FP;  } -const std::vector<SubchannelSource*>& EtiReader::getSubchannels() +const std::vector<boost::shared_ptr<SubchannelSource> >& EtiReader::getSubchannels()  {      return mySources;  } -int EtiReader::process(Buffer* dataIn) +int EtiReader::process(const Buffer* dataIn)  {      PDEBUG("EtiReader::process(dataIn: %p)\n", dataIn);      PDEBUG(" state: %u\n", state); @@ -146,6 +151,7 @@ int EtiReader::process(Buffer* dataIn)                  return dataIn->getLength() - input_size;              }              memcpy(&eti_fc, in, 4); +            eti_fc_valid = true;              input_size -= 4;              framesize -= 4;              in += 4; @@ -171,13 +177,12 @@ int EtiReader::process(Buffer* dataIn)                      (memcmp(&eti_stc[0], in, 4 * eti_fc.NST))) {                  PDEBUG("New stc!\n");                  eti_stc.resize(eti_fc.NST); -                for (unsigned i = 0; i < mySources.size(); ++i) { -                    delete mySources[i]; -                } -                mySources.resize(eti_fc.NST);                  memcpy(&eti_stc[0], in, 4 * eti_fc.NST); + +                mySources.clear();                  for (unsigned i = 0; i < eti_fc.NST; ++i) { -                    mySources[i] = new SubchannelSource(eti_stc[i]); +                    mySources.push_back(shared_ptr<SubchannelSource>( +                                new SubchannelSource(eti_stc[i])));                      PDEBUG("Sstc %u:\n", i);                      PDEBUG(" Stc%i.scid: %i\n", i, eti_stc[i].SCID);                      PDEBUG(" Stc%i.sad: %u\n", i, eti_stc[i].getStartAddress()); diff --git a/src/EtiReader.h b/src/EtiReader.h index 209b208..b893f01 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2014 +   Copyright (C) 2014, 2015     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -41,6 +41,7 @@  #include <vector>  #include <stdint.h>  #include <sys/types.h> +#include <boost/shared_ptr.hpp>  class EtiReader @@ -54,8 +55,8 @@ public:      FicSource* getFic();      unsigned getMode();      unsigned getFp(); -    const std::vector<SubchannelSource*>& getSubchannels(); -    int process(Buffer* dataIn); +    const std::vector<boost::shared_ptr<SubchannelSource> >& getSubchannels(); +    int process(const Buffer* dataIn);      void calculateTimestamp(struct frame_timestamp& ts)      { @@ -83,14 +84,16 @@ protected:      eti_EOF eti_eof;      eti_TIST eti_tist;      FicSource* myFicSource; -    std::vector<SubchannelSource*> mySources; +    std::vector<boost::shared_ptr<SubchannelSource> > mySources;      TimestampDecoder myTimestampDecoder; -     +  private:      size_t myCurrentFrame;      bool time_ext_enabled;      unsigned long timestamp_seconds; +    bool eti_fc_valid;  };  #endif // ETI_READER_H + diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index dd9c68b..3844e86 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -1,6 +1,11 @@  /*     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2015 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -38,16 +43,18 @@  #include <sys/time.h>  #endif +using namespace boost; -typedef std::vector<Node*>::iterator NodeIterator; -typedef std::vector<Edge*>::iterator EdgeIterator; +typedef std::vector<shared_ptr<Node> >::iterator NodeIterator; +typedef std::vector<shared_ptr<Edge> >::iterator EdgeIterator; -Node::Node(ModPlugin* plugin) : +Node::Node(shared_ptr<ModPlugin> plugin) :      myPlugin(plugin),      myProcessTime(0)  { -    PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", plugin->name(), plugin, this); +    PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", +            plugin->name(), plugin.get(), this);  } @@ -56,24 +63,21 @@ Node::~Node()  {      PDEBUG("Node::~Node() @ %p\n", this); -    if (myPlugin != NULL) { -        delete myPlugin; -    }      assert(myInputBuffers.size() == 0);      assert(myOutputBuffers.size() == 0);  } -Edge::Edge(Node* srcNode, Node* dstNode) : +Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :      mySrcNode(srcNode),      myDstNode(dstNode)  {      PDEBUG("Edge::Edge(srcNode(%s): %p, dstNode(%s): %p) @ %p\n", -            srcNode->plugin()->name(), srcNode, -            dstNode->plugin()->name(), dstNode, +            srcNode->plugin()->name(), srcNode.get(), +            dstNode->plugin()->name(), dstNode.get(),              this); -    myBuffer = new Buffer(); +    myBuffer = shared_ptr<Buffer>(new Buffer());      srcNode->myOutputBuffers.push_back(myBuffer);      dstNode->myInputBuffers.push_back(myBuffer);  } @@ -83,7 +87,7 @@ Edge::~Edge()  {      PDEBUG("Edge::~Edge() @ %p\n", this); -    std::vector<Buffer*>::iterator buffer; +    std::vector<shared_ptr<Buffer> >::iterator buffer;      if (myBuffer != NULL) {          for (buffer = mySrcNode->myOutputBuffers.begin();                  buffer != mySrcNode->myOutputBuffers.end(); @@ -102,7 +106,6 @@ Edge::~Edge()                  break;              }          } -        delete myBuffer;      }  } @@ -110,9 +113,26 @@ Edge::~Edge()  int Node::process()  {      PDEBUG("Edge::process()\n"); -    PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin); +    PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin.get()); + +    // the plugin process() still wants vector<Buffer*> +    // arguments. +    std::vector<Buffer*> inBuffers; +    std::vector<shared_ptr<Buffer> >::iterator buffer; +    for (buffer = myInputBuffers.begin(); +         buffer != myInputBuffers.end(); +         ++buffer) { +        inBuffers.push_back(buffer->get()); +    } -    return myPlugin->process(myInputBuffers, myOutputBuffers); +    std::vector<Buffer*> outBuffers; +    for (buffer = myOutputBuffers.begin(); +         buffer != myOutputBuffers.end(); +         ++buffer) { +        outBuffers.push_back(buffer->get()); +    } + +    return myPlugin->process(inBuffers, outBuffers);  } @@ -128,35 +148,26 @@ Flowgraph::~Flowgraph()  {      PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this); -    std::vector<Edge*>::const_iterator edge; -    for (edge = edges.begin(); edge != edges.end(); ++edge) { -        delete *edge; -    } -      if (myProcessTime) {          fprintf(stderr, "Process time:\n"); -    } -    std::vector<Node*>::const_iterator node; -    for (node = nodes.begin(); node != nodes.end(); ++node) { -        if (myProcessTime) { + +        std::vector<shared_ptr<Node> >::const_iterator node; +        for (node = nodes.begin(); node != nodes.end(); ++node) {              fprintf(stderr, "  %30s: %10u us (%2.2f %%)\n",                      (*node)->plugin()->name(),                      (unsigned)(*node)->processTime(),                      (*node)->processTime() * 100.0 / myProcessTime);          } -        delete *node; -    } -    if (myProcessTime) { +          fprintf(stderr, "  %30s: %10u us (100.00 %%)\n", "total",                  (unsigned)myProcessTime);      }  } - -void Flowgraph::connect(ModPlugin* input, ModPlugin* output) +void Flowgraph::connect(shared_ptr<ModPlugin> input, shared_ptr<ModPlugin> output)  {      PDEBUG("Flowgraph::connect(input(%s): %p, output(%s): %p)\n", -            input->name(), input, output->name(), output); +            input->name(), input.get(), output->name(), output.get());      NodeIterator inputNode;      NodeIterator outputNode; @@ -167,7 +178,7 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output)          }      }      if (inputNode == nodes.end()) { -        inputNode = nodes.insert(nodes.end(), new Node(input)); +        inputNode = nodes.insert(nodes.end(), shared_ptr<Node>(new Node(input)));      }      for (outputNode = nodes.begin(); outputNode != nodes.end(); ++outputNode) { @@ -176,14 +187,14 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output)          }      }      if (outputNode == nodes.end()) { -        outputNode = nodes.insert(nodes.end(), new Node(output)); +        outputNode = nodes.insert(nodes.end(), shared_ptr<Node>(new Node(output)));          for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {              if ((*inputNode)->plugin() == input) {                  break;              }          }      } else if (inputNode > outputNode) { -        Node* node = *outputNode; +        shared_ptr<Node> node = *outputNode;          nodes.erase(outputNode);          outputNode = nodes.insert(nodes.end(), node);          for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) { @@ -196,7 +207,7 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output)      assert((*inputNode)->plugin() == input);      assert((*outputNode)->plugin() == output); -    edges.push_back(new Edge(*inputNode, *outputNode)); +    edges.push_back(shared_ptr<Edge>(new Edge(*inputNode, *outputNode)));  } @@ -204,7 +215,7 @@ bool Flowgraph::run()  {      PDEBUG("Flowgraph::run()\n"); -    std::vector<Node*>::const_iterator node; +    std::vector<shared_ptr<Node> >::const_iterator node;      timeval start, stop;      time_t diff; @@ -224,3 +235,4 @@ bool Flowgraph::run()      }      return true;  } + diff --git a/src/Flowgraph.h b/src/Flowgraph.h index 178b6a9..1129668 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -1,6 +1,11 @@  /*     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2015 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -32,20 +37,21 @@  #include <sys/types.h>  #include <vector> +#include <boost/shared_ptr.hpp>  class Node  {  public: -    Node(ModPlugin* plugin); +    Node(boost::shared_ptr<ModPlugin> plugin);      ~Node();      Node(const Node&);      Node& operator=(const Node&); -    ModPlugin* plugin() { return myPlugin; } +    boost::shared_ptr<ModPlugin> plugin() { return myPlugin; } -    std::vector<Buffer*> myInputBuffers; -    std::vector<Buffer*> myOutputBuffers; +    std::vector<boost::shared_ptr<Buffer> > myInputBuffers; +    std::vector<boost::shared_ptr<Buffer> > myOutputBuffers;      int process();      time_t processTime() { return myProcessTime; } @@ -54,7 +60,7 @@ public:      }  protected: -    ModPlugin* myPlugin; +    boost::shared_ptr<ModPlugin> myPlugin;      time_t myProcessTime;  }; @@ -62,15 +68,15 @@ protected:  class Edge  {  public: -    Edge(Node* src, Node* dst); +    Edge(boost::shared_ptr<Node>& src, boost::shared_ptr<Node>& dst);      ~Edge();      Edge(const Edge&);      Edge& operator=(const Edge&);  protected: -    Node* mySrcNode; -    Node* myDstNode; -    Buffer* myBuffer; +    boost::shared_ptr<Node> mySrcNode; +    boost::shared_ptr<Node> myDstNode; +    boost::shared_ptr<Buffer> myBuffer;  }; @@ -82,14 +88,16 @@ public:      Flowgraph(const Flowgraph&);      Flowgraph& operator=(const Flowgraph&); -    void connect(ModPlugin* input, ModPlugin* output); +    void connect(boost::shared_ptr<ModPlugin> input, +                 boost::shared_ptr<ModPlugin> output);      bool run();  protected: -    std::vector<Node*> nodes; -    std::vector<Edge*> edges; +    std::vector<boost::shared_ptr<Node> > nodes; +    std::vector<boost::shared_ptr<Edge> > edges;      time_t myProcessTime;  };  #endif // FLOWGRAPH_H + diff --git a/src/FormatConverter.cpp b/src/FormatConverter.cpp new file mode 100644 index 0000000..766b6d8 --- /dev/null +++ b/src/FormatConverter.cpp @@ -0,0 +1,103 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty +   the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +    This flowgraph block converts complexf to signed integer. + */ +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "FormatConverter.h" +#include "PcDebug.h" + +#include <malloc.h> +#include <sys/types.h> +#include <string.h> +#include <stdexcept> +#include <assert.h> + +#ifdef __SSE__ +#  include <xmmintrin.h> +#endif + +FormatConverter::FormatConverter(void) : +    ModCodec(ModFormat(sizeof(complexf)), +            ModFormat(sizeof(int8_t))) { } + +/* Expect the input samples to be in the range [-255.0, 255.0] */ +int FormatConverter::process(Buffer* const dataIn, Buffer* dataOut) +{ +    PDEBUG("FormatConverter::process(dataIn: %p, dataOut: %p)\n", +            dataIn, dataOut); + +    size_t sizeIn = dataIn->getLength() / sizeof(float); +    dataOut->setLength(sizeIn * sizeof(int8_t)); + +    float* in = reinterpret_cast<float*>(dataIn->getData()); + +#if 0 +    // Disabled because subscripting a __m64 doesn't seem to work +    // on all platforms. + +    /* +      _mm_cvtps_pi8 does: +             |<----------- 128 bits ------------>| +      __m128 |   I1   |   Q1   |   I2   |   Q2   | in float +      __m64  |I1Q1I2Q2|00000000|                   in int8_t +     */ + +    uint32_t* out = reinterpret_cast<uint32_t*>(dataOut->getData()); + +    assert(sizeIn % 16 == 0); +    assert((uintptr_t)in % 16 == 0); +    for(size_t i = 0, j = 0; i < sizeIn; i+=16, j+=4) +    { +        __m128 a1 = _mm_load_ps(in+i+0); +        __m128 a2 = _mm_load_ps(in+i+4); +        __m128 a3 = _mm_load_ps(in+i+8); +        __m128 a4 = _mm_load_ps(in+i+12); +        __m64 b1 = _mm_cvtps_pi8(a1); +        __m64 b2 = _mm_cvtps_pi8(a2); +        __m64 b3 = _mm_cvtps_pi8(a3); +        __m64 b4 = _mm_cvtps_pi8(a4); +        out[j+0]  = b1[0]; +        out[j+1]  = b2[0]; +        out[j+2]  = b3[0]; +        out[j+3]  = b4[0]; +    } +#else +    int8_t* out = reinterpret_cast<int8_t*>(dataOut->getData()); + +    // Slow implementation that uses _ftol() +    for (size_t i = 0; i < sizeIn; i++) { +        out[i] = in[i]; +    } +#endif + +    return 1; +} + +const char* FormatConverter::name() +{ +    return "FormatConverter"; +} + diff --git a/src/FormatConverter.h b/src/FormatConverter.h new file mode 100644 index 0000000..0243685 --- /dev/null +++ b/src/FormatConverter.h @@ -0,0 +1,53 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty +   the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2014 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +    This flowgraph block converts complexf to signed integer. + */ +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef FORMAT_CONVERTER_H +#define FORMAT_CONVERTER_H + +#ifdef HAVE_CONFIG_H +#   include <config.h> +#endif + +#include "porting.h" +#include "ModCodec.h" +#include <complex> +#include <stdint.h> + +typedef std::complex<float> complexf; + +class FormatConverter : public ModCodec +{ +    public: +        FormatConverter(void); + +        int process(Buffer* const dataIn, Buffer* dataOut); +        const char* name(); +}; + +#endif // FORMAT_CONVERTER_H + diff --git a/src/FrameMultiplexer.cpp b/src/FrameMultiplexer.cpp index c5e58b7..843f72d 100644 --- a/src/FrameMultiplexer.cpp +++ b/src/FrameMultiplexer.cpp @@ -30,8 +30,11 @@  typedef std::complex<float> complexf; +using namespace boost; -FrameMultiplexer::FrameMultiplexer(size_t framesize, const std::vector<SubchannelSource*>* subchannels) : +FrameMultiplexer::FrameMultiplexer( +        size_t framesize, +        const std::vector<shared_ptr<SubchannelSource> >* subchannels) :      ModMux(ModFormat(framesize), ModFormat(framesize)),      d_frameSize(framesize),      mySubchannels(subchannels) @@ -76,7 +79,7 @@ int FrameMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut)      ++in;      // Write subchannel      assert(mySubchannels->size() == dataIn.size() - 1); -    std::vector<SubchannelSource*>::const_iterator subchannel = +    std::vector<shared_ptr<SubchannelSource> >::const_iterator subchannel =          mySubchannels->begin();      while (in != dataIn.end()) {          assert((*subchannel)->framesizeCu() * 8 == (*in)->getLength()); @@ -88,3 +91,4 @@ int FrameMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut)      return dataOut->getLength();  } + diff --git a/src/FrameMultiplexer.h b/src/FrameMultiplexer.h index f1bd587..ba571f6 100644 --- a/src/FrameMultiplexer.h +++ b/src/FrameMultiplexer.h @@ -29,7 +29,7 @@  #include "ModMux.h"  #include "SubchannelSource.h" - +#include <boost/shared_ptr.hpp>  #include <sys/types.h> @@ -37,7 +37,8 @@  class FrameMultiplexer : public ModMux  {  public: -    FrameMultiplexer(size_t frameSize, const std::vector<SubchannelSource*>* subchannels); +    FrameMultiplexer(size_t frameSize, +            const std::vector<boost::shared_ptr<SubchannelSource> >* subchannels);      virtual ~FrameMultiplexer();      FrameMultiplexer(const FrameMultiplexer&);      FrameMultiplexer& operator=(const FrameMultiplexer&); @@ -48,8 +49,8 @@ public:  protected:      size_t d_frameSize; -    const std::vector<SubchannelSource*>* mySubchannels; +    const std::vector<boost::shared_ptr<SubchannelSource> >* mySubchannels;  }; -  #endif // FRAME_MULTIPLEXER_H + diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 4a7e050..205fbfa 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -284,17 +284,17 @@ int InputFileReader::GetNextFrame(void* buffer)      if (read_bytes != frameSize) {          // A short read of a frame (i.e. reading an incomplete frame)          // is not tolerated. Input files must not contain incomplete frames -        if (read_bytes != 0){ -          fprintf(stderr, -                  "Unable to read a complete frame of %u data bytes from input file!\n", -                  frameSize); - -          perror(filename_.c_str()); -          logger_.level(error) << "Unable to read from input file!"; -          return -1; +        if (read_bytes != 0) { +            fprintf(stderr, +                    "Unable to read a complete frame of %u data bytes from input file!\n", +                    frameSize); + +            perror(filename_.c_str()); +            logger_.level(error) << "Unable to read from input file!"; +            return -1;          }          else { -          return 0; +            return 0;          }      } diff --git a/src/InputReader.h b/src/InputReader.h index 164c5ac..13d49b8 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyrigth (C) 2013 +   Copyrigth (C) 2013, 2015     Matthias P. Braendli, matthias.braendli@mpb.li   */  /* @@ -31,7 +31,7 @@  #endif  #include <cstdio> -#if defined(HAVE_INPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ)  #  include "zmq.hpp"  #  include "ThreadsafeQueue.h"  #endif @@ -91,9 +91,9 @@ class InputFileReader : public InputReader          ~InputFileReader()          { -            fprintf(stderr, "\nClosing input file...\n"); -              if (inputfile_ != NULL) { +                fprintf(stderr, "\nClosing input file...\n"); +                  fclose(inputfile_);              }          } @@ -130,13 +130,24 @@ class InputFileReader : public InputReader                              // after 2**32 * 24ms ~= 3.3 years  }; -#if defined(HAVE_INPUT_ZEROMQ) +struct zmq_input_overflow : public std::exception +{ +  const char* what () const throw () +  { +    return "InputZMQ buffer overflow"; +  } +}; + +#if defined(HAVE_ZEROMQ)  /* A ZeroMQ input. See www.zeromq.org for more info */  struct InputZeroMQThreadData  {      ThreadsafeQueue<uint8_t*> *in_messages;      std::string uri; +    unsigned max_queued_frames; + +    bool running;  };  class InputZeroMQWorker @@ -172,6 +183,7 @@ class InputZeroMQReader : public InputReader              logger_(logger), in_messages_(10)          {              workerdata_.in_messages = &in_messages_; +            workerdata_.running     = false;          }          ~InputZeroMQReader() @@ -179,7 +191,7 @@ class InputZeroMQReader : public InputReader              worker_.Stop();          } -        int Open(std::string uri); +        int Open(const std::string& uri, unsigned max_queued_frames);          int GetNextFrame(void* buffer); @@ -197,3 +209,4 @@ class InputZeroMQReader : public InputReader  #endif  #endif + diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index cfb56b2..f8c15c4 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2013, 2014 +   Copyright (C) 2013, 2014, 2015     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -29,7 +29,7 @@  #   include "config.h"  #endif -#if defined(HAVE_INPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ)  #include <string>  #include <cstring> @@ -41,8 +41,6 @@  #include "InputReader.h"  #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 -  #define NUM_FRAMES_PER_ZMQ_MESSAGE 4  /* A concatenation of four ETI frames,   * whose maximal size is 6144. @@ -64,10 +62,18 @@ struct zmq_dab_message_t      uint8_t  buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];  }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames)  { -    uri_ = uri; +    // The URL might start with zmq+tcp:// +    if (uri.substr(0, 4) == "zmq+") { +        uri_ = uri.substr(4); +    } +    else { +        uri_ = uri; +    } +      workerdata_.uri = uri; +    workerdata_.max_queued_frames = max_queued_frames;      // launch receiver thread      worker_.Start(&workerdata_); @@ -81,6 +87,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)      uint8_t* incoming;      in_messages_.wait_and_pop(incoming); +    if (! workerdata_.running) { +        throw zmq_input_overflow(); +    } +      memcpy(buffer, incoming, framesize);      delete incoming; @@ -123,7 +133,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                  }                  m_to_drop--;              } -            else if (queue_size < MAX_QUEUE_SIZE) { +            else if (queue_size < workerdata->max_queued_frames) {                  if (buffer_full) {                      fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",                              queue_size); @@ -175,6 +185,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                      fprintf(stderr, "ZeroMQ buffer overfull !\n");                      buffer_full = true; +                    throw std::runtime_error("ZMQ input full");                  }                  queue_size = workerdata->in_messages->size(); @@ -188,7 +199,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)              }              if (queue_size < 5) { -                fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", +                fprintf(stderr, "ZeroMQ buffer low: %zu elements !\n",                          queue_size);              }          } @@ -196,15 +207,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)      catch (zmq::error_t& err) {          fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());      } +    catch (std::exception& err) { +    }      fprintf(stderr, "ZeroMQ input worker terminated\n");      subscriber.close(); + +    workerdata->running = false; +    workerdata->in_messages->notify();  }  void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)  {      running = true; +    workerdata->running = true;      recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);  } diff --git a/src/Makefile.am b/src/Makefile.am deleted file mode 100644 index 922ce52..0000000 --- a/src/Makefile.am +++ /dev/null @@ -1,126 +0,0 @@ -# Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the -# Queen in Right of Canada (Communications Research Center Canada) - -# This file is part of ODR-DabMod. -# -# ODR-DabMod is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# ODR-DabMod is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -#  -# You should have received a copy of the GNU General Public License -# along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. - -if IS_GIT_REPO -GITVERSION_FLAGS = -DGITVERSION="\"`git describe`\"" -else -GITVERSION_FLAGS = -endif - -if HAVE_SSE -SIMD_CFLAGS = -msse -msse2 -else -SIMD_CFLAGS = -endif - -bin_PROGRAMS = odr-dabmod - -if USE_KISS_FFT -FFT_DIR=$(top_builddir)/lib/kiss_fft129 -FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools -FFT_SRC=$(FFT_DIR)/kiss_fft.c \ -		$(FFT_DIR)/kiss_fft.h \ -		$(FFT_DIR)/tools/kiss_fftr.c \ -		$(FFT_DIR)/tools/kiss_fftr.h \ -		kiss_fftsimd.c \ -		kiss_fftsimd.h -FFT_FLG=-ffast-math - -.PHONY: kiss_fft129 reed-solomon-4.0 - -DabModulator.cpp: $(FFT_DIR) - -BUILT_SOURCES: $(FFT_DIR) - -FFT_LDADD= - -$(FFT_DIR): -	if [ ! -e $(FFT_DIR) ]; then \ -		tar xzf $(top_srcdir)/lib/kiss_fft129.tar.gz -C $(top_builddir)/lib; \ -	fi - -else -FFT_LDADD= -FFT_DIR= -FFT_INC= -FFT_SRC= -FFT_FLG= -endif - -odr_dabmod_CPPFLAGS = -Wall \ -					  $(FFT_INC) $(FFT_FLG) $(SIMD_CFLAGS) $(GITVERSION_FLAGS) -odr_dabmod_LDADD    = $(FFT_LDADD) -odr_dabmod_SOURCES  = DabMod.cpp \ -					  PcDebug.h \ -					  porting.c porting.h \ -					  DabModulator.cpp DabModulator.h  \ -					  Buffer.cpp Buffer.h \ -					  ModCodec.cpp ModCodec.h \ -					  ModPlugin.cpp ModPlugin.h \ -					  ModFormat.cpp ModFormat.h \ -					  EtiReader.cpp EtiReader.h \ -					  Eti.cpp Eti.h \ -					  FicSource.cpp FicSource.h \ -					  FIRFilter.cpp FIRFilter.h \ -					  ModInput.cpp ModInput.h \ -					  PuncturingRule.cpp PuncturingRule.h \ -					  PuncturingEncoder.cpp PuncturingEncoder.h \ -					  SubchannelSource.cpp SubchannelSource.h \ -					  Flowgraph.cpp Flowgraph.h \ -					  GainControl.cpp GainControl.h \ -					  OutputMemory.cpp OutputMemory.h \ -					  OutputZeroMQ.cpp OutputZeroMQ.h \ -					  TimestampDecoder.h TimestampDecoder.cpp \ -					  OutputUHD.cpp OutputUHD.h \ -					  ModOutput.cpp ModOutput.h \ -					  InputMemory.cpp InputMemory.h \ -					  InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \ -					  OutputFile.cpp OutputFile.h \ -					  FrameMultiplexer.cpp FrameMultiplexer.h \ -					  ModMux.cpp ModMux.h \ -					  PrbsGenerator.cpp PrbsGenerator.h \ -					  BlockPartitioner.cpp BlockPartitioner.h \ -					  QpskSymbolMapper.cpp QpskSymbolMapper.h \ -					  FrequencyInterleaver.cpp FrequencyInterleaver.h \ -					  PhaseReference.cpp PhaseReference.h \ -					  DifferentialModulator.cpp DifferentialModulator.h \ -					  NullSymbol.cpp NullSymbol.h \ -					  SignalMultiplexer.cpp SignalMultiplexer.h \ -					  CicEqualizer.cpp CicEqualizer.h \ -					  OfdmGenerator.cpp OfdmGenerator.h \ -					  GuardIntervalInserter.cpp GuardIntervalInserter.h \ -					  Resampler.cpp Resampler.h \ -					  ConvEncoder.cpp ConvEncoder.h \ -					  TimeInterleaver.cpp TimeInterleaver.h \ -					  ThreadsafeQueue.h \ -					  Log.cpp Log.h \ -					  RemoteControl.cpp RemoteControl.h \ -					  zmq.hpp - -nodist_odr_dabmod_SOURCES = $(FFT_SRC) - -dist_bin_SCRIPTS = crc-dwap.py - -if USE_KISS_FFT -EXTRA_DIST = kiss_fftsimd.c kiss_fftsimd.h - -clean-local: -	rm -rf $(FFT_DIR) - -endif - diff --git a/src/OutputMemory.h b/src/OutputMemory.h index 2dd49c5..56cbc01 100644 --- a/src/OutputMemory.h +++ b/src/OutputMemory.h @@ -50,7 +50,7 @@  class OutputMemory : public ModOutput  {  public: -    OutputMemory(Buffer* dataOut = NULL); +    OutputMemory(Buffer* dataOut);      virtual ~OutputMemory();      virtual int process(Buffer* dataIn, Buffer* dataOut);      const char* name() { return "OutputMemory"; } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 8063e75..dbf8b9d 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -47,8 +47,8 @@ using namespace std;  typedef std::complex<float> complexf;  OutputUHD::OutputUHD( -        OutputUHDConfig& config, -        Logger& logger) : +        const OutputUHDConfig& config, +        Logger *logger) :      ModOutput(ModFormat(1), ModFormat(0)),      RemoteControllable("uhd"),      myLogger(logger), @@ -56,10 +56,12 @@ OutputUHD::OutputUHD(      // Since we don't know the buffer size, we cannot initialise      // the buffers at object initialisation.      first_run(true), -    activebuffer(1) +    activebuffer(1), +    myDelayBuf(0)  {      myMuting = 0; // is remote-controllable +    myStaticDelayUs = 0; // is remote-controllable  #if FAKE_UHD      MDEBUG("OutputUHD:Using fake UHD output"); @@ -87,7 +89,8 @@ OutputUHD::OutputUHD(      /* register the parameters that can be remote controlled */      RC_ADD_PARAMETER(txgain, "UHD analog daughterboard TX gain");      RC_ADD_PARAMETER(freq,   "UHD transmission frequency"); -    RC_ADD_PARAMETER(muting, "mute the output by stopping the transmitter"); +    RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter"); +    RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000");      uhd::set_thread_priority_safe(); @@ -149,18 +152,18 @@ OutputUHD::OutputUHD(              myConf.muteNoTimestamps ? "enabled" : "disabled");      if (myConf.enableSync && (myConf.pps_src == "none")) { -        myLogger.level(warn) << +        myLogger->level(warn) <<              "OutputUHD: WARNING:"              " you are using synchronous transmission without PPS input!";          struct timespec now;          if (clock_gettime(CLOCK_REALTIME, &now)) {              perror("OutputUHD:Error: could not get time: "); -            myLogger.level(error) << "OutputUHD: could not get time"; +            myLogger->level(error) << "OutputUHD: could not get time";          }          else {              myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec)); -            myLogger.level(info) << "OutputUHD: Setting USRP time to " << +            myLogger->level(info) << "OutputUHD: Setting USRP time to " <<                      uhd::time_spec_t(now.tv_sec).get_real_secs();          }      } @@ -171,7 +174,7 @@ OutputUHD::OutputUHD(          struct timespec now;          time_t seconds;          if (clock_gettime(CLOCK_REALTIME, &now)) { -            myLogger.level(error) << "OutputUHD: could not get time :" << +            myLogger->level(error) << "OutputUHD: could not get time :" <<                  strerror(errno);              throw std::runtime_error("OutputUHD: could not get time.");          } @@ -182,7 +185,7 @@ OutputUHD::OutputUHD(              while (seconds + 1 > now.tv_sec) {                  usleep(1);                  if (clock_gettime(CLOCK_REALTIME, &now)) { -                    myLogger.level(error) << "OutputUHD: could not get time :" << +                    myLogger->level(error) << "OutputUHD: could not get time :" <<                          strerror(errno);                      throw std::runtime_error("OutputUHD: could not get time.");                  } @@ -192,12 +195,12 @@ OutputUHD::OutputUHD(              usleep(200000); // 200ms, we want the PPS to be later              myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2)); -            myLogger.level(info) << "OutputUHD: Setting USRP time next pps to " << +            myLogger->level(info) << "OutputUHD: Setting USRP time next pps to " <<                      uhd::time_spec_t(seconds + 2).get_real_secs();          }          usleep(1e6); -        myLogger.log(info,  "OutputUHD: USRP time %f\n", +        myLogger->log(info,  "OutputUHD: USRP time %f\n",                  myUsrp->get_time_now().get_real_secs());      } @@ -211,7 +214,7 @@ OutputUHD::OutputUHD(      uwd.sampleRate = myConf.sampleRate;      uwd.sourceContainsTimestamp = false;      uwd.muteNoTimestamps = myConf.muteNoTimestamps; -    uwd.logger = &myLogger; +    uwd.logger = myLogger;      uwd.refclk_lock_loss_behaviour = myConf.refclk_lock_loss_behaviour;      if (myConf.refclk_src == "internal") { @@ -221,13 +224,12 @@ OutputUHD::OutputUHD(          uwd.check_refclk_loss = true;      } +    SetDelayBuffer(config.dabMode);      shared_ptr<barrier> b(new barrier(2));      mySyncBarrier = b;      uwd.sync_barrier = b; -    worker.start(&uwd); -      MDEBUG("OutputUHD:UHD ready.\n");  } @@ -236,6 +238,38 @@ OutputUHD::~OutputUHD()  {      MDEBUG("OutputUHD::~OutputUHD() @ %p\n", this);      worker.stop(); +    if (!first_run) { +        free(uwd.frame0.buf); +        free(uwd.frame1.buf); +    } +} + +void OutputUHD::SetDelayBuffer(unsigned int dabMode) +{ +    // find out the duration of the transmission frame (Table 2 in ETSI 300 401) +    switch (dabMode) { +        case 0: // could happen when called from constructor and we take the mode from ETI +            myTFDurationMs = 0; +            break; +        case 1: +            myTFDurationMs = 96; +            break; +        case 2: +            myTFDurationMs = 24; +            break; +        case 3: +            myTFDurationMs = 24; +            break; +        case 4: +            myTFDurationMs = 48; +            break; +        default: +            throw std::runtime_error("OutputUHD: invalid DAB mode"); +    } +    // The buffer size equals the number of samples per transmission frame so +    // we calculate it by multiplying the duration of the transmission frame +    // with the samplerate. +    myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000);  }  int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) @@ -250,7 +284,9 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)      // We will only wait on the barrier on the subsequent calls to      // OutputUHD::process      if (first_run) { -        myLogger.level(debug) << "OutputUHD: UHD initialising..."; +        myLogger->level(debug) << "OutputUHD: UHD initialising..."; + +        worker.start(&uwd);          uwd.bufsize = dataIn->getLength();          uwd.frame0.buf = malloc(uwd.bufsize); @@ -273,24 +309,42 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)              default: break;          } +        // we only set the delay buffer from the dab mode signaled in ETI if the +        // dab mode was not set in contructor +        if (myTFDurationMs == 0) { +            SetDelayBuffer(myEtiReader->getMode()); +        }          activebuffer = 1;          lastLen = uwd.bufsize;          first_run = false; -        myLogger.level(debug) << "OutputUHD: UHD initialising complete"; +        myLogger->level(debug) << "OutputUHD: UHD initialising complete";      }      else {          if (lastLen != dataIn->getLength()) {              // I expect that this never happens. -            myLogger.level(emerg) << +            myLogger->level(emerg) <<                  "OutputUHD: Fatal error, input length changed from " << lastLen <<                  " to " << dataIn->getLength();              throw std::runtime_error("Non-constant input length!");          }          mySyncBarrier.get()->wait(); +        if (!uwd.running) { +            worker.stop(); +            first_run = true; +            if (uwd.failed_due_to_fct) { +                throw fct_discontinuity_error(); +            } +            else { +                myLogger->level(error) << +                    "OutputUHD: Error, UHD worker failed"; +                throw std::runtime_error("UHD worker failed"); +            } +        } +          // write into the our buffer while          // the worker sends the other. @@ -298,13 +352,30 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)          uwd.sourceContainsTimestamp = myConf.enableSync &&              myEtiReader->sourceContainsTimestamp(); +        // calculate delay +        uint32_t noSampleDelay = (myStaticDelayUs * (myConf.sampleRate / 1000)) / 1000; +        uint32_t noByteDelay = noSampleDelay * sizeof(complexf); + +        uint8_t* pInData = (uint8_t*) dataIn->getData();          if (activebuffer == 0) { -            memcpy(uwd.frame0.buf, dataIn->getData(), uwd.bufsize); +            uint8_t *pTmp = (uint8_t*) uwd.frame0.buf; +            // copy remain from delaybuf +            memcpy(pTmp, &myDelayBuf[0], noByteDelay); +            // copy new data +            memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); +            // copy remaining data to delay buf +            memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);              uwd.frame0.ts = ts;          }          else if (activebuffer == 1) { -            memcpy(uwd.frame1.buf, dataIn->getData(), uwd.bufsize); +            uint8_t *pTmp = (uint8_t*) uwd.frame1.buf; +            // copy remain from delaybuf +            memcpy(pTmp, &myDelayBuf[0], noByteDelay); +            // copy new data +            memcpy(&pTmp[noByteDelay], pInData, uwd.bufsize - noByteDelay); +            // copy remaining data to delay buf +            memcpy(&myDelayBuf[0], &pInData[uwd.bufsize - noByteDelay], noByteDelay);              uwd.frame1.ts = ts;          } @@ -316,6 +387,21 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut)  } +void UHDWorker::process_errhandler() +{ +    try { +        process(); +    } +    catch (fct_discontinuity_error& e) { +        uwd->logger->level(warn) << e.what(); +        uwd->failed_due_to_fct = true; +    } + +    uwd->running = false; +    uwd->sync_barrier.get()->wait(); +    uwd->logger->level(warn) << "UHD worker terminated"; +} +  void UHDWorker::process()  {      int workerbuffer  = 0; @@ -351,7 +437,7 @@ void UHDWorker::process()      int expected_next_fct = -1; -    while (running) { +    while (uwd->running) {          bool fct_discontinuity = false;          md.has_time_spec = false;          md.time_spec = uhd::time_spec_t(0.0); @@ -385,12 +471,19 @@ void UHDWorker::process()          /* Verify that the FCT value is correct. If we miss one transmission           * frame we must interrupt UHD and resync to the timestamps           */ +        if (frame->ts.fct == -1) { +            uwd->logger->level(info) << +                "OutputUHD: dropping one frame with invalid FCT"; +            goto loopend; +        }          if (expected_next_fct != -1) {              if (expected_next_fct != (int)frame->ts.fct) {                  uwd->logger->level(warn) << -                    "OutputUHD: Incorrect expect fct " << frame->ts.fct; +                    "OutputUHD: Incorrect expect fct " << frame->ts.fct << +                    ", expected " << expected_next_fct;                  fct_discontinuity = true; +                throw fct_discontinuity_error();              }          } @@ -437,7 +530,7 @@ void UHDWorker::process()              md.time_spec = uhd::time_spec_t(tx_second, pps_offset);              // md is defined, let's do some checks -            if (md.time_spec.get_real_secs() + 0.2 < usrp_time) { +            if (md.time_spec.get_real_secs() + timeout < usrp_time) {                  uwd->logger->level(warn) <<                      "OutputUHD: Timestamp in the past! offset: " <<                      md.time_spec.get_real_secs() - usrp_time << @@ -448,12 +541,14 @@ void UHDWorker::process()                  goto loopend; //skip the frame              } +#if 0 // Let uhd handle this              if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) {                  uwd->logger->level(warn) <<                          "OutputUHD: Timestamp too far in the future! offset: " <<                          md.time_spec.get_real_secs() - usrp_time;                  usleep(20000); //sleep so as to fill buffers              } +#endif              if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) {                  uwd->logger->level(error) << @@ -493,7 +588,7 @@ void UHDWorker::process()          PDEBUG("UHDWorker::process:max_num_samps: %zu.\n",                  usrp_max_num_samps); -        while (running && !uwd->muting && (num_acc_samps < sizeIn)) { +        while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) {              size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps);              //ensure the the last packet has EOB set if the timestamps has been @@ -605,8 +700,6 @@ loopend:          // swap buffers          workerbuffer = (workerbuffer + 1) % 2;      } - -    uwd->logger->level(warn) << "UHD worker terminated";  } @@ -627,6 +720,25 @@ void OutputUHD::set_parameter(const string& parameter, const string& value)      else if (parameter == "muting") {          ss >> myMuting;      } +    else if (parameter == "staticdelay") { +        int64_t adjust; +        ss >> adjust; +        if (adjust > (myTFDurationMs * 1000)) +        { // reset static delay for values outside range +            myStaticDelayUs = 0; +        } +        else +        { // the new adjust value is added to the existing delay and the result +            // is wrapped around at TF duration +            int newStaticDelayUs = myStaticDelayUs + adjust; +            if (newStaticDelayUs > (myTFDurationMs * 1000)) +                myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000); +            else if (newStaticDelayUs < 0) +                myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000); +            else +                myStaticDelayUs = newStaticDelayUs; +        } +    }      else {          stringstream ss;          ss << "Parameter '" << parameter @@ -647,6 +759,9 @@ const string OutputUHD::get_parameter(const string& parameter) const      else if (parameter == "muting") {          ss << myMuting;      } +    else if (parameter == "staticdelay") { +        ss << myStaticDelayUs; +    }      else {          ss << "Parameter '" << parameter <<              "' is not exported by controllable " << get_rc_name(); diff --git a/src/OutputUHD.h b/src/OutputUHD.h index a2ffb7d..aed80f6 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -83,9 +83,20 @@ struct UHDWorkerFrameData {      struct frame_timestamp ts;  }; +struct fct_discontinuity_error : public std::exception +{ +  const char* what () const throw () +  { +    return "FCT discontinuity detected"; +  } +}; +  enum refclk_lock_loss_behaviour_t { CRASH, IGNORE };  struct UHDWorkerData { +    bool running; +    bool failed_due_to_fct; +  #if FAKE_UHD == 0      uhd::usrp::multi_usrp::sptr myUsrp;  #endif @@ -130,28 +141,26 @@ struct UHDWorkerData {  class UHDWorker {      public: -        UHDWorker () { -            running = false; -        } -          void start(struct UHDWorkerData *uhdworkerdata) { -            running = true;              uwd = uhdworkerdata; -            uhd_thread = boost::thread(&UHDWorker::process, this); + +            uwd->running = true; +            uwd->failed_due_to_fct = false; +            uhd_thread = boost::thread(&UHDWorker::process_errhandler, this);          }          void stop() { -            running = false; +            uwd->running = false;              uhd_thread.interrupt();              uhd_thread.join();          } +    private:          void process(); +        void process_errhandler(); -    private:          struct UHDWorkerData *uwd; -        bool running;          boost::thread uhd_thread;          uhd::tx_streamer::sptr myTxStream; @@ -171,6 +180,7 @@ struct OutputUHDConfig {      double txgain;      bool enableSync;      bool muteNoTimestamps; +    unsigned dabMode;      /* allowed values : auto, int, sma, mimo */      std::string refclk_src; @@ -188,9 +198,10 @@ struct OutputUHDConfig {  class OutputUHD: public ModOutput, public RemoteControllable {      public: +          OutputUHD( -                OutputUHDConfig& config, -                Logger& logger); +                const OutputUHDConfig& config, +                Logger *logger);          ~OutputUHD();          int process(Buffer* dataIn, Buffer* dataOut); @@ -216,7 +227,7 @@ class OutputUHD: public ModOutput, public RemoteControllable {      protected: -        Logger& myLogger; +        Logger *myLogger;          EtiReader *myEtiReader;          OutputUHDConfig myConf;          uhd::usrp::multi_usrp::sptr myUsrp; @@ -229,6 +240,15 @@ class OutputUHD: public ModOutput, public RemoteControllable {          // muting can only be changed using the remote control          bool myMuting; +    private: +        // Resize the internal delay buffer according to the dabMode and +        // the sample rate. +        void SetDelayBuffer(unsigned int dabMode); + +        // data +        int myStaticDelayUs; // static delay in microseconds +        int myTFDurationMs; // TF duration in milliseconds +        std::vector<complexf> myDelayBuf;          size_t lastLen;  }; diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 0e759dd..da4473e 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -30,21 +30,33 @@  #include <string.h>  #include <sstream> -#if defined(HAVE_OUTPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ) -OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut) +OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)      : ModOutput(ModFormat(1), ModFormat(0)), +    m_type(type),      m_zmq_context(1), -    m_zmq_pub_sock(m_zmq_context, ZMQ_PUB), +    m_zmq_sock(m_zmq_context, type),      m_endpoint(endpoint)  {      PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this);      std::stringstream ss; -    ss << "OutputZeroMQ(" << m_endpoint << ")"; +    ss << "OutputZeroMQ(" << m_endpoint << " "; + +    if (type == ZMQ_PUB) { +        ss << "ZMQ_PUB"; +    } +    else if (type == ZMQ_REP) { +        ss << "ZMQ_REP"; +    } +    else { +        throw std::invalid_argument("ZMQ socket type unknown"); +    } +    ss << ")";      m_name = ss.str(); -    m_zmq_pub_sock.bind(m_endpoint.c_str()); +    m_zmq_sock.bind(m_endpoint.c_str());  }  OutputZeroMQ::~OutputZeroMQ() @@ -58,10 +70,16 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut)              "(dataIn: %p, dataOut: %p)\n",              dataIn, dataOut); -    m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength()); +    if (m_type == ZMQ_REP) { +        // A ZMQ_REP socket requires a request first +        zmq::message_t msg; +        m_zmq_sock.recv(&msg); +    } + +    m_zmq_sock.send(dataIn->getData(), dataIn->getLength());      return dataIn->getLength();  } -#endif // HAVE_OUTPUT_ZEROMQ_H +#endif // HAVE_ZEROMQ diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h index 1c48fe7..85f85a7 100644 --- a/src/OutputZeroMQ.h +++ b/src/OutputZeroMQ.h @@ -31,7 +31,7 @@  #   include "config.h"  #endif -#if defined(HAVE_OUTPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ)  #include "ModOutput.h"  #include "zmq.hpp" @@ -39,14 +39,15 @@  class OutputZeroMQ : public ModOutput  {      public: -        OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL); +        OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL);          virtual ~OutputZeroMQ();          virtual int process(Buffer* dataIn, Buffer* dataOut);          const char* name() { return m_name.c_str(); }      protected: +        int m_type;                   // zmq socket type          zmq::context_t m_zmq_context; // handle for the zmq context -        zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket +        zmq::socket_t m_zmq_sock;     // handle for the zmq publisher socket          std::string m_endpoint;       // On which port to listen: e.g.                                        // tcp://*:58300 @@ -54,7 +55,7 @@ class OutputZeroMQ : public ModOutput          std::string m_name;  }; -#endif // HAVE_OUTPUT_ZEROMQ_H +#endif // HAVE_ZEROMQ  #endif // OUTPUT_ZEROMQ_H diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 5bbd2f8..65da3b7 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -66,7 +66,8 @@ void RemoteControllerTelnet::process(long)      try {          boost::asio::io_service io_service; -        tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), m_port)); +        tcp::acceptor acceptor(io_service, tcp::endpoint( +                    boost::asio::ip::address::from_string("127.0.0.1"), m_port) );          while (m_running) {              in_message = ""; @@ -246,3 +247,146 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)              ignored_error);  } + +#if defined(HAVE_ZEROMQ) + +void RemoteControllerZmq::restart() +{ +    m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ +    m_running = false; + +    if (!m_endpoint.empty()) { +        m_child_thread.interrupt(); +        m_child_thread.join(); +    } + +    m_child_thread = boost::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message) +{ +    int more = -1; +    size_t more_size = sizeof(more); + +    while (more != 0) +    { +        zmq::message_t msg; +        pSocket->recv(&msg); +        message.push_back(std::string((char*)msg.data(), msg.size())); +        pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); +    } +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) +{ +    zmq::message_t msg(2); +    char repCode[2] = {'o', 'k'}; +    memcpy ((void*) msg.data(), repCode, 2); +    pSocket->send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) +{ +    zmq::message_t msg1(4); +    char repCode[4] = {'f', 'a', 'i', 'l'}; +    memcpy ((void*) msg1.data(), repCode, 4); +    pSocket->send(msg1, ZMQ_SNDMORE); + +    zmq::message_t msg2(error.length()); +    memcpy ((void*) msg2.data(), error.c_str(), error.length()); +    pSocket->send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ +    // create zmq reply socket for receiving ctrl parameters +    zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); +    std::cout << "Starting zmq remote control thread" << std::endl; +    try +    { +        // connect the socket +        int hwm = 100; +        int linger = 0; +        repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +        repSocket.bind(m_endpoint.c_str()); + +        // create pollitem that polls the  ZMQ sockets +        zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; +        for(;;) +        { +            zmq::poll(pollItems, 1, 100); +            std::vector<std::string> msg; +            if (pollItems[0].revents & ZMQ_POLLIN) +            { +                recv_all(&repSocket, msg); +                std::string command((char*)msg[0].data(), msg[0].size()); + +                if (msg.size() == 1 && command == "ping") +                { +                    send_ok_reply(&repSocket); +                } +                else if (msg.size() == 3 && command == "get") +                { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); + +                    try +                    { +                        std::string value = get_param_(module, parameter); +                        zmq::message_t *pMsg = new zmq::message_t(value.size()); +                        memcpy ((void*) pMsg->data(), value.data(), value.size()); +                        repSocket.send(*pMsg, 0); +                        delete pMsg; +                    } +                    catch (ParameterError &err) +                    { +                        send_fail_reply(&repSocket, err.what()); +                    } +                } +                else if (msg.size() == 4 && command == "set") +                { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); +                    std::string value((char*) msg[3].data(), msg[3].size()); + +                    try +                    { +                        set_param_(module, parameter, value); +                        send_ok_reply(&repSocket); +                    } +                    catch (ParameterError &err) +                    { +                        send_fail_reply(&repSocket, err.what()); +                    } +                } +                else +                    send_fail_reply(&repSocket, "Unsupported command"); +            } + +            // check if thread is interrupted +            boost::this_thread::interruption_point(); +        } +    } +    catch (boost::thread_interrupted&) {} +    catch (zmq::error_t &e) +    { +        std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; +    } +    catch (std::exception& e) +    { +        std::cerr << "Remote control caught exception: " << e.what() << std::endl; +        m_fault = true; +    } +    repSocket.close(); +} +#endif + diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 09e7492..89a1583 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -29,6 +29,14 @@  #ifndef _REMOTECONTROL_H  #define _REMOTECONTROL_H +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#if defined(HAVE_ZEROMQ) +#include <zmq.hpp> +#endif +  #include <list>  #include <map>  #include <string> @@ -85,6 +93,39 @@ class BaseRemoteController {          virtual ~BaseRemoteController() {}  }; +/* Holds all our remote controllers, i.e. we may have more than + * one type of controller running. + */ +class RemoteControllers { +    public: +        void add_controller(BaseRemoteController *rc) { +            m_controllers.push_back(rc); +        } + +        void add_controllable(RemoteControllable *rc) { +            for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin(); +                    it != m_controllers.end(); ++it) { +                (*it)->enrol(rc); +            } +        } + +        void check_faults() { +            for (std::list<BaseRemoteController*>::iterator it = m_controllers.begin(); +                    it != m_controllers.end(); ++it) { +                if ((*it)->fault_detected()) +                { +                    fprintf(stderr, +                            "Detected Remote Control fault, restarting it\n"); +                    (*it)->restart(); +                } +            } +        } +        size_t get_no_controllers() { return m_controllers.size(); } + +    private: +        std::list<BaseRemoteController*> m_controllers; +}; +  /* Objects that support remote control must implement the following class */  class RemoteControllable {      public: @@ -100,8 +141,8 @@ class RemoteControllable {          virtual std::string get_rc_name() const { return m_name; }          /* Tell the controllable to enrol at the given controller */ -        virtual void enrol_at(BaseRemoteController& controller) { -            controller.enrol(this); +        virtual void enrol_at(RemoteControllers& controllers) { +            controllers.add_controllable(this);          }          /* Return a list of possible parameters that can be set */ @@ -254,6 +295,88 @@ class RemoteControllerTelnet : public BaseRemoteController {          int m_port;  }; +#if defined(HAVE_ZEROMQ) +/* Implements a Remote controller using zmq transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { +    public: +        RemoteControllerZmq() +            : m_running(false), m_fault(false), +            m_zmqContext(1), +            m_endpoint("") { } + +        RemoteControllerZmq(std::string endpoint) +            : m_running(true), m_fault(false), +            m_child_thread(&RemoteControllerZmq::process, this), +            m_zmqContext(1), +            m_endpoint(endpoint) { } + +        ~RemoteControllerZmq() { +            m_running = false; +            m_fault = false; +            if (!m_endpoint.empty()) { +                m_child_thread.interrupt(); +                m_child_thread.join(); +            } +        } + +        void enrol(RemoteControllable* controllable) { +            m_cohort.push_back(controllable); +        } + +        virtual bool fault_detected() { return m_fault; } + +        virtual void restart(); + +    private: +        void restart_thread(); + +        void recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message); +        void send_ok_reply(zmq::socket_t *pSocket); +        void send_fail_reply(zmq::socket_t *pSocket, const std::string &error); +        void process(); + + +        RemoteControllerZmq& operator=(const RemoteControllerZmq& other); +        RemoteControllerZmq(const RemoteControllerZmq& other); + +        RemoteControllable* get_controllable_(std::string name) { +            for (std::list<RemoteControllable*>::iterator it = m_cohort.begin(); +                    it != m_cohort.end(); ++it) { +                if ((*it)->get_rc_name() == name) +                { +                    return *it; +                } +            } +            throw ParameterError("Module name unknown"); +        } + +        std::string get_param_(std::string name, std::string param) { +            RemoteControllable* controllable = get_controllable_(name); +            return controllable->get_parameter(param); +        } + +        void set_param_(std::string name, std::string param, std::string value) { +            RemoteControllable* controllable = get_controllable_(name); +            return controllable->set_parameter(param, value); +        } + +        bool m_running; + +        /* This is set to true if a fault occurred */ +        bool m_fault; +        boost::thread m_restarter_thread; + +        boost::thread m_child_thread; + +        /* This controller commands the controllables in the cohort */ +        std::list<RemoteControllable*> m_cohort; + +        zmq::context_t m_zmqContext; +        std::string m_endpoint; +}; +#endif  /* The Dummy remote controller does nothing, and never fails   */ diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 96c84c0..6063048 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -69,7 +69,7 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts)          ts.timestamp_sec = 0;          ts.timestamp_pps_offset = 0;          ts.timestamp_refresh = false; -        ts.fct = 0; +        ts.fct = -1;      }      else {          //fprintf(stderr, ". %zu ", queue_timestamps.size()); @@ -191,7 +191,7 @@ void TimestampDecoder::updateTimestampEti(          int framephase,          uint16_t mnsc,          double pps, -        uint32_t fct) +        int32_t fct)  {      updateTimestampPPS(pps);      pushMNSCData(framephase, mnsc); diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index 0c393e4..8c6b362 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -55,7 +55,7 @@ struct modulator_offset_config  struct frame_timestamp  {      // Which frame count does this timestamp apply to -    uint32_t fct; +    int32_t fct;      uint32_t timestamp_sec;      double timestamp_pps_offset; @@ -101,9 +101,10 @@ struct frame_timestamp      void print(const char* t)      {          fprintf(stderr, -                "%s <struct frame_timestamp(%s, %d, %.9f)>\n",  +                "%s <struct frame_timestamp(%s, %d, %.9f, %d)>\n",                  t, this->timestamp_valid ? "valid" : "invalid", -                 this->timestamp_sec, this->timestamp_pps_offset); +                 this->timestamp_sec, this->timestamp_pps_offset, +                 this->fct);      }  }; @@ -140,7 +141,7 @@ class TimestampDecoder                  int framephase,                  uint16_t mnsc,                  double pps, -                uint32_t fct); +                int32_t fct);          /* Update the modulator timestamp offset according to the modconf           */ @@ -167,7 +168,7 @@ class TimestampDecoder          struct tm temp_time;          uint32_t time_secs; -        uint32_t latestFCT; +        int32_t latestFCT;          double time_pps;          double timestamp_offset;          int inhibit_second_update; diff --git a/src/Utils.cpp b/src/Utils.cpp new file mode 100644 index 0000000..8b97602 --- /dev/null +++ b/src/Utils.cpp @@ -0,0 +1,121 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2015 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + */ +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Utils.h" +#include "GainControl.h" + +void printUsage(char* progName) +{ +    FILE* out = stderr; + +    fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", +            PACKAGE, +#if defined(GITVERSION) +            GITVERSION, +#else +            VERSION, +#endif +            __DATE__, __TIME__); +    fprintf(out, "Usage with configuration file:\n"); +    fprintf(out, "\t%s [-C] config_file.ini\n\n", progName); + +    fprintf(out, "Usage with command line options:\n"); +    fprintf(out, "\t%s" +            " input" +            " (-f filename | -u uhddevice -F frequency) " +            " [-G txgain]" +            " [-o offset]" +            " [-O offsetfile]" +            " [-T filter_taps_file]" +            " [-a gain]" +            " [-c clockrate]" +            " [-g gainMode]" +            " [-h]" +            " [-l]" +            " [-m dabMode]" +            " [-r samplingRate]" +            "\n", progName); +    fprintf(out, "Where:\n"); +    fprintf(out, "input:         ETI input filename (default: stdin).\n"); +    fprintf(out, "-f name:       Use file output with given filename. (use /dev/stdout for standard output)\n"); +    fprintf(out, "-u device:     Use UHD output with given device string. (use "" for default device)\n"); +    fprintf(out, "-F frequency:  Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n"); +    fprintf(out, "-G txgain:     Set the transmit gain for the UHD driver (default: 0)\n"); +    fprintf(out, "-o:            (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n"); +    fprintf(out, "-O:            (UHD only) Set the file containing the timestamp offset added to the timestamp in the ETI.\n" +                                 "The file is read every six seconds, and must contain a double value.\n"); +    fprintf(out, "                  Specifying either -o or -O has two implications: It enables synchronous transmission,\n" +                 "                  requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n" +                 "                  get muted.\n\n"); +    fprintf(out, "-T taps_file:  Enable filtering before the output, using the specified file containing the filter taps.\n"); +    fprintf(out, "-a gain:       Apply digital amplitude gain.\n"); +    fprintf(out, "-c rate:       Set the DAC clock rate and enable Cic Equalisation.\n"); +    fprintf(out, "-g:            Set computation gain mode: " +            "%u FIX, %u MAX, %u VAR\n", GAIN_FIX, GAIN_MAX, GAIN_VAR); +    fprintf(out, "-h:            Print this help.\n"); +    fprintf(out, "-l:            Loop file when reach end of file.\n"); +    fprintf(out, "-m mode:       Set DAB mode: (0: auto, 1-4: force).\n"); +    fprintf(out, "-r rate:       Set output sampling rate (default: 2048000).\n\n"); +} + + +void printVersion(void) +{ +    FILE *out = stderr; + +    fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", +            PACKAGE, VERSION, __DATE__, __TIME__); +    fprintf(out, +            "    ODR-DabMod is copyright (C) Her Majesty the Queen in Right of Canada,\n" +            "    2009, 2010, 2011, 2012 Communications Research Centre (CRC),\n" +            "     and\n" +            "    Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li\n" +            "\n" +            "    http://opendigitalradio.org\n" +            "\n" +            "    ODR-DabMod is free software: you can redistribute it and/or modify it\n" +            "    under the terms of the GNU General Public License as published by the\n" +            "    Free Software Foundation, either version 3 of the License, or (at your\n" +            "    option) any later version.\n" +            "\n" +            "    ODR-DabMod is distributed in the hope that it will be useful, but\n" +            "    WITHOUT ANY WARRANTY; without even the implied warranty of\n" +            "    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU\n" +            "    General Public License for more details.\n" +            "\n" +            "    You should have received a copy of the GNU General Public License along\n" +            "    with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.\n" +            "\n" +#if USE_KISS_FFT +            "ODR-DabMod makes use of the following open source packages:\n" +            "    Kiss FFT v1.2.9 (Revised BSD) - http://kissfft.sourceforge.net/\n" +#endif +           ); + +} + + diff --git a/src/Utils.h b/src/Utils.h new file mode 100644 index 0000000..7c3129c --- /dev/null +++ b/src/Utils.h @@ -0,0 +1,44 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2015 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + */ +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UTILS_H_ +#define __UTILS_H_ + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#include <stdlib.h> +#include <unistd.h> +#include <stdio.h> + +void printUsage(char* progName); + +void printVersion(void); + +#endif + | 
