diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Buffer.cpp | 1 | ||||
-rw-r--r-- | src/CicEqualizer.cpp | 12 | ||||
-rw-r--r-- | src/DabMod.cpp | 476 | ||||
-rw-r--r-- | src/DabModulator.cpp | 153 | ||||
-rw-r--r-- | src/DabModulator.h | 14 | ||||
-rw-r--r-- | src/EtiReader.cpp | 43 | ||||
-rw-r--r-- | src/EtiReader.h | 23 | ||||
-rw-r--r-- | src/FIRFilter.cpp | 21 | ||||
-rw-r--r-- | src/FIRFilter.h | 8 | ||||
-rw-r--r-- | src/Flowgraph.cpp | 82 | ||||
-rw-r--r-- | src/Flowgraph.h | 32 | ||||
-rw-r--r-- | src/FrameMultiplexer.cpp | 8 | ||||
-rw-r--r-- | src/FrameMultiplexer.h | 9 | ||||
-rw-r--r-- | src/InputFileReader.cpp | 49 | ||||
-rw-r--r-- | src/InputReader.h | 42 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 75 | ||||
-rw-r--r-- | src/Makefile.am | 127 | ||||
-rw-r--r-- | src/OutputMemory.h | 2 | ||||
-rw-r--r-- | src/OutputUHD.cpp | 880 | ||||
-rw-r--r-- | src/OutputUHD.h | 93 | ||||
-rw-r--r-- | src/OutputZeroMQ.cpp | 28 | ||||
-rw-r--r-- | src/OutputZeroMQ.h | 5 | ||||
-rw-r--r-- | src/RemoteControl.cpp | 6 | ||||
-rw-r--r-- | src/RemoteControl.h | 10 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 24 | ||||
-rw-r--r-- | src/TimestampDecoder.cpp | 112 | ||||
-rw-r--r-- | src/TimestampDecoder.h | 84 | ||||
-rw-r--r-- | src/Utils.cpp | 118 | ||||
-rw-r--r-- | src/Utils.h | 62 |
29 files changed, 1463 insertions, 1136 deletions
diff --git a/src/Buffer.cpp b/src/Buffer.cpp index aa0ef4c..fa7f52f 100644 --- a/src/Buffer.cpp +++ b/src/Buffer.cpp @@ -47,6 +47,7 @@ Buffer::Buffer(size_t len, const void *data) Buffer::~Buffer() { + PDEBUG("Buffer::~Buffer() len=%zu, data=%p\n", len, data); free(data); } diff --git a/src/CicEqualizer.cpp b/src/CicEqualizer.cpp index d8eb2ee..a9c0dd6 100644 --- a/src/CicEqualizer.cpp +++ b/src/CicEqualizer.cpp @@ -46,11 +46,12 @@ CicEqualizer::CicEqualizer(size_t nbCarriers, size_t spacing, int R) : float angle = pi * k / spacing; if (k == 0) { myFilter[i] = 1.0f; - } else { - myFilter[i] = sinf(angle / R) / sinf(angle * M); - myFilter[i] = fabsf(myFilter[i]) * R * M; - myFilter[i] = powf(myFilter[i], N); - } + } + else { + myFilter[i] = sinf(angle / R) / sinf(angle * M); + myFilter[i] = fabsf(myFilter[i]) * R * M; + myFilter[i] = powf(myFilter[i], N); + } PDEBUG("HCic[%zu -> %i] = %f (%f dB) -> angle: %f\n", i, k,myFilter[i], 20.0 * log10(myFilter[i]), angle); } @@ -93,3 +94,4 @@ int CicEqualizer::process(Buffer* const dataIn, Buffer* dataOut) return sizeOut; } + diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 214231c..1fc7e3c 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -30,7 +30,7 @@ #endif #include "porting.h" - +#include "Utils.h" #include "Log.h" #include "DabModulator.h" #include "InputMemory.h" @@ -46,6 +46,8 @@ #include "FIRFilter.h" #include "RemoteControl.h" +#include <boost/shared_ptr.hpp> +#include <boost/make_shared.hpp> #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/ini_parser.hpp> #include <complex> @@ -68,9 +70,12 @@ # define memalign(a, b) malloc(b) #endif +#define ZMQ_INPUT_MAX_FRAME_QUEUE 500 + typedef std::complex<float> complexf; +using namespace boost; volatile sig_atomic_t running = 1; @@ -81,113 +86,51 @@ void signalHandler(int signalNb) running = 0; } - -void printUsage(char* progName, FILE* out = stderr) +struct modulator_data { - fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", - PACKAGE, -#if defined(GITVERSION) - GITVERSION, -#else - VERSION, -#endif - __DATE__, __TIME__); - fprintf(out, "Usage with configuration file:\n"); - fprintf(out, "\t%s [-C] config_file.ini\n\n", progName); - - fprintf(out, "Usage with command line options:\n"); - fprintf(out, "\t%s" - " input" - " (-f filename | -u uhddevice -F frequency) " - " [-G txgain]" - " [-o offset]" - " [-O offsetfile]" - " [-T filter_taps_file]" - " [-a gain]" - " [-c clockrate]" - " [-g gainMode]" - " [-h]" - " [-l]" - " [-m dabMode]" - " [-r samplingRate]" - "\n", progName); - fprintf(out, "Where:\n"); - fprintf(out, "input: ETI input filename (default: stdin).\n"); - fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n"); - fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n"); - fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n"); - fprintf(out, "-G txgain: Set the transmit gain for the UHD driver (default: 0)\n"); - fprintf(out, "-o: (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n"); - fprintf(out, "-O: (UHD only) Set the file containing the timestamp offset added to the timestamp in the ETI.\n" - "The file is read every six seconds, and must contain a double value.\n"); - fprintf(out, " Specifying either -o or -O has two implications: It enables synchronous transmission,\n" - " requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n" - " get muted.\n\n"); - fprintf(out, "-T taps_file: Enable filtering before the output, using the specified file containing the filter taps.\n"); - fprintf(out, "-a gain: Apply digital amplitude gain.\n"); - fprintf(out, "-c rate: Set the DAC clock rate and enable Cic Equalisation.\n"); - fprintf(out, "-g: Set computation gain mode: " - "%u FIX, %u MAX, %u VAR\n", GAIN_FIX, GAIN_MAX, GAIN_VAR); - fprintf(out, "-h: Print this help.\n"); - fprintf(out, "-l: Loop file when reach end of file.\n"); - fprintf(out, "-m mode: Set DAB mode: (0: auto, 1-4: force).\n"); - fprintf(out, "-r rate: Set output sampling rate (default: 2048000).\n"); -} + modulator_data() : + inputReader(NULL), + framecount(0), + flowgraph(NULL), + rcs(NULL) {} + InputReader* inputReader; + Buffer data; + uint64_t framecount; -void printVersion(FILE *out = stderr) -{ - fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", - PACKAGE, VERSION, __DATE__, __TIME__); - fprintf(out, - " ODR-DabMod is copyright (C) Her Majesty the Queen in Right of Canada,\n" - " 2009, 2010, 2011, 2012 Communications Research Centre (CRC),\n" - " and\n" - " Copyright (C) 2014 Matthias P. Braendli, matthias.braendli@mpb.li\n" - "\n" - " http://opendigitalradio.org\n" - "\n" - " This program is available free of charge and is licensed to you on a\n" - " non-exclusive basis; you may not redistribute it.\n" - "\n" - " This program is provided \"AS IS\" in the hope that it will be useful, but\n" - " WITHOUT ANY WARRANTY with respect to its accurancy or usefulness; witout\n" - " even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR\n" - " PURPOSE and NONINFRINGEMENT.\n" - "\n" - " In no event shall CRC be LIABLE for any LOSS, DAMAGE or COST that may be\n" - " incurred in connection with the use of this software.\n" - "\n" -#if USE_KISS_FFT - "ODR-DabMod makes use of the following open source packages:\n" - " Kiss FFT v1.2.9 (Revised BSD) - http://kissfft.sourceforge.net/\n" -#endif - ); + Flowgraph* flowgraph; + RemoteControllers* rcs; +}; -} +enum run_modulator_state { + MOD_FAILURE, + MOD_NORMAL_END, + MOD_AGAIN +}; +run_modulator_state run_modulator(modulator_data& m); -int main(int argc, char* argv[]) +int launch_modulator(int argc, char* argv[]) { int ret = 0; bool loop = false; std::string inputName = ""; std::string inputTransport = "file"; + unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; std::string outputName; int useZeroMQOutput = 0; + std::string zmqOutputSocketType = ""; int useFileOutput = 0; std::string fileOutputFormat = "complexf"; int useUHDOutput = 0; - uint64_t frame = 0; size_t outputRate = 2048000; size_t clockRate = 0; unsigned dabMode = 0; float digitalgain = 1.0f; float normalise = 1.0f; GainMode gainMode = GAIN_VAR; - Buffer data; /* UHD requires the input I and Q samples to be in the interval @@ -211,26 +154,25 @@ int main(int argc, char* argv[]) OutputUHDConfig outputuhd_conf; #endif + modulator_data m; + // To handle the timestamp offset of the modulator - struct modulator_offset_config modconf; - modconf.use_offset_file = false; - modconf.use_offset_fixed = false; - modconf.delay_calculation_pipeline_stages = 0; + unsigned tist_delay_stages = 0; + double tist_offset_s = 0.0; - Flowgraph* flowgraph = NULL; - DabModulator* modulator = NULL; - InputMemory* input = NULL; - FormatConverter* format_converter = NULL; - ModOutput* output = NULL; + shared_ptr<Flowgraph> flowgraph(new Flowgraph()); + shared_ptr<FormatConverter> format_converter; + shared_ptr<ModOutput> output; RemoteControllers rcs; + m.rcs = &rcs; - Logger logger; - InputFileReader inputFileReader(logger); + bool run_again = true; + + InputFileReader inputFileReader; #if defined(HAVE_ZEROMQ) - InputZeroMQReader inputZeroMQReader(logger); + shared_ptr<InputZeroMQReader> inputZeroMQReader(new InputZeroMQReader()); #endif - InputReader* inputReader; struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); @@ -271,7 +213,7 @@ int main(int argc, char* argv[]) #if defined(HAVE_OUTPUT_UHD) if (useUHDOutput) { fprintf(stderr, "Options -u and -f are mutually exclusive\n"); - goto END_MAIN; + throw std::invalid_argument("Invalid command line options"); } #endif outputName = optarg; @@ -294,25 +236,7 @@ int main(int argc, char* argv[]) loop = true; break; case 'o': - if (modconf.use_offset_file) - { - fprintf(stderr, "Options -o and -O are mutually exclusive\n"); - goto END_MAIN; - } - modconf.use_offset_fixed = true; - modconf.offset_fixed = strtod(optarg, NULL); -#if defined(HAVE_OUTPUT_UHD) - outputuhd_conf.enableSync = true; -#endif - break; - case 'O': - if (modconf.use_offset_fixed) - { - fprintf(stderr, "Options -o and -O are mutually exclusive\n"); - goto END_MAIN; - } - modconf.use_offset_file = true; - modconf.offset_filename = std::string(optarg); + tist_offset_s = strtod(optarg, NULL); #if defined(HAVE_OUTPUT_UHD) outputuhd_conf.enableSync = true; #endif @@ -330,7 +254,7 @@ int main(int argc, char* argv[]) #if defined(HAVE_OUTPUT_UHD) if (useFileOutput) { fprintf(stderr, "Options -u and -f are mutually exclusive\n"); - goto END_MAIN; + throw std::invalid_argument("Invalid command line options"); } outputuhd_conf.device = optarg; useUHDOutput = 1; @@ -338,17 +262,17 @@ int main(int argc, char* argv[]) break; case 'V': printVersion(); - goto END_MAIN; + throw std::invalid_argument(""); break; case '?': case 'h': printUsage(argv[0]); - goto END_MAIN; + throw std::invalid_argument(""); break; default: fprintf(stderr, "Option '%c' not coded yet!\n", c); ret = -1; - goto END_MAIN; + throw std::invalid_argument("Invalid command line options"); } } @@ -389,7 +313,7 @@ int main(int argc, char* argv[]) // No argument given ? You can't be serious ! Show usage. if (argc == 1) { printUsage(argv[0]); - goto END_MAIN; + throw std::invalid_argument("Invalid command line options"); } // If only one argument is given, interpret as configuration file name @@ -408,8 +332,9 @@ int main(int argc, char* argv[]) } catch (boost::property_tree::ini_parser::ini_parser_error &e) { - fprintf(stderr, "Error, cannot read configuration file '%s'\n", configuration_file.c_str()); - goto END_MAIN; + std::cerr << "Error, cannot read configuration file '" << configuration_file.c_str() << "'" << std::endl; + std::cerr << " " << e.what() << std::endl; + throw std::runtime_error("Cannot read configuration file"); } // remote controller: @@ -422,7 +347,7 @@ int main(int argc, char* argv[]) catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " telnet remote control enabled, but no telnetport defined.\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } } @@ -437,7 +362,7 @@ int main(int argc, char* argv[]) catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " zmq remote control enabled, but no endpoint defined.\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } } #endif @@ -448,12 +373,15 @@ int main(int argc, char* argv[]) } inputTransport = pt.get("input.transport", "file"); + inputMaxFramesQueued = pt.get("input.max_frames_queued", + ZMQ_INPUT_MAX_FRAME_QUEUE); + inputName = pt.get("input.source", "/dev/stdin"); // log parameters: if (pt.get("log.syslog", 0) == 1) { LogToSyslog* log_syslog = new LogToSyslog(); - logger.register_backend(log_syslog); + etiLog.register_backend(log_syslog); } if (pt.get("log.filelog", 0) == 1) { @@ -464,11 +392,11 @@ int main(int argc, char* argv[]) catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration enables file log, but does not specify log filename\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } LogToFile* log_file = new LogToFile(logfilename); - logger.register_backend(log_file); + etiLog.register_backend(log_file); } @@ -487,7 +415,7 @@ int main(int argc, char* argv[]) catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration enables firfilter, but does not specify filter taps file\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } } @@ -499,7 +427,7 @@ int main(int argc, char* argv[]) catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration does not specify output\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } if (output_selected == "file") { @@ -509,7 +437,7 @@ int main(int argc, char* argv[]) catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration does not specify file name for file output\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } useFileOutput = 1; @@ -535,11 +463,11 @@ int main(int argc, char* argv[]) outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0); outputuhd_conf.frequency = pt.get<double>("uhdoutput.frequency", 0); std::string chan = pt.get<std::string>("uhdoutput.channel", ""); - outputuhd_conf.dabMode = dabMode; + outputuhd_conf.dabMode = dabMode; if (outputuhd_conf.frequency == 0 && chan == "") { std::cerr << " UHD output enabled, but neither frequency nor channel defined.\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } else if (outputuhd_conf.frequency == 0) { double freq; @@ -583,13 +511,13 @@ int main(int argc, char* argv[]) else if (chan == "13F") freq = 239200000; else { std::cerr << " UHD output: channel " << chan << " does not exist in table\n"; - goto END_MAIN; + throw std::out_of_range("UHD channel selection error"); } outputuhd_conf.frequency = freq; } else if (outputuhd_conf.frequency != 0 && chan != "") { std::cerr << " UHD output: cannot define both frequency and channel.\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } @@ -607,44 +535,44 @@ int main(int argc, char* argv[]) } else { std::cerr << "Error: UHD output: behaviour_refclk_lock_lost invalid." << std::endl; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } + outputuhd_conf.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0); + useUHDOutput = 1; } #endif #if defined(HAVE_ZEROMQ) else if (output_selected == "zmq") { outputName = pt.get<std::string>("zmqoutput.listen"); + zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type"); useZeroMQOutput = 1; } #endif else { std::cerr << "Error: Invalid output defined.\n"; - goto END_MAIN; + throw std::runtime_error("Configuration error"); } #if defined(HAVE_OUTPUT_UHD) outputuhd_conf.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1); if (outputuhd_conf.enableSync) { + std::string delay_mgmt = pt.get<std::string>("delaymanagement.management", ""); + std::string fixedoffset = pt.get<std::string>("delaymanagement.fixedoffset", ""); + std::string offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile", ""); + + if (not(delay_mgmt.empty() and fixedoffset.empty() and offset_filename.empty())) { + std::cerr << "Warning: you are using the old config syntax for the offset management.\n"; + std::cerr << " Please see the example.ini configuration for the new settings.\n"; + } + try { - std::string delay_mgmt = pt.get<std::string>("delaymanagement.management"); - if (delay_mgmt == "fixed") { - modconf.offset_fixed = pt.get<double>("delaymanagement.fixedoffset"); - modconf.use_offset_fixed = true; - } - else if (delay_mgmt == "dynamic") { - modconf.offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile"); - modconf.use_offset_file = true; - } - else { - throw std::runtime_error("invalid management value"); - } + tist_offset_s = pt.get<double>("delaymanagement.offset"); } catch (std::exception &e) { - std::cerr << "Error: " << e.what() << "\n"; - std::cerr << " Synchronised transmission enabled, but delay management specification is incomplete.\n"; - goto END_MAIN; + std::cerr << "Error: delaymanagement: synchronous is enabled, but no offset defined!\n"; + throw std::runtime_error("Configuration error"); } } @@ -653,23 +581,17 @@ int main(int argc, char* argv[]) } if (rcs.get_no_controllers() == 0) { - logger.level(warn) << "No Remote-Control started"; + etiLog.level(warn) << "No Remote-Control started"; rcs.add_controller(new RemoteControllerDummy()); } - logger.level(info) << "Starting up"; - - if (!(modconf.use_offset_file || modconf.use_offset_fixed)) { - logger.level(debug) << "No Modulator offset defined, setting to 0"; - modconf.use_offset_fixed = true; - modconf.offset_fixed = 0; - } + etiLog.level(info) << "Starting up"; // When using the FIRFilter, increase the modulator offset pipelining delay // by the correct amount if (filterTapsFilename != "") { - modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY; + tist_delay_stages += FIRFILTER_PIPELINE_DELAY; } // Setting ETI input filename @@ -697,14 +619,14 @@ int main(int argc, char* argv[]) fprintf(stderr, "\n"); printUsage(argv[0]); ret = -1; - logger.level(error) << "Received invalid command line arguments"; - goto END_MAIN; + etiLog.level(error) << "Received invalid command line arguments"; + throw std::invalid_argument("Invalid command line options"); } if (!useFileOutput && !useUHDOutput && !useZeroMQOutput) { - logger.level(error) << "Output not specified"; + etiLog.level(error) << "Output not specified"; fprintf(stderr, "Must specify output !"); - goto END_MAIN; + throw std::runtime_error("Configuration error"); } // Print settings @@ -721,16 +643,22 @@ int main(int argc, char* argv[]) fprintf(stderr, " UHD\n" " Device: %s\n" " Type: %s\n" - " master_clock_rate: %ld\n", + " master_clock_rate: %ld\n" + " refclk: %s\n" + " pps source: %s\n", outputuhd_conf.device.c_str(), outputuhd_conf.usrpType.c_str(), - outputuhd_conf.masterClockRate); + outputuhd_conf.masterClockRate, + outputuhd_conf.refclk_src.c_str(), + outputuhd_conf.pps_src.c_str()); } #endif else if (useZeroMQOutput) { fprintf(stderr, " ZeroMQ\n" - " Listening on: %s\n", - outputName.c_str()); + " Listening on: %s\n" + " Socket type : %s\n", + outputName.c_str(), + zmqOutputSocketType.c_str()); } fprintf(stderr, " Sampling rate: "); @@ -748,96 +676,147 @@ int main(int argc, char* argv[]) // Opening ETI input file if (inputFileReader.Open(inputName, loop) == -1) { fprintf(stderr, "Unable to open input file!\n"); - logger.level(error) << "Unable to open input file!"; + etiLog.level(error) << "Unable to open input file!"; ret = -1; - goto END_MAIN; + throw std::runtime_error("Unable to open input"); } - inputReader = &inputFileReader; + m.inputReader = &inputFileReader; } else if (inputTransport == "zeromq") { #if !defined(HAVE_ZEROMQ) fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n"); ret = -1; - goto END_MAIN; + throw std::runtime_error("Unable to open input"); #else - // The URL might start with zmq+tcp:// - if (inputName.substr(0, 4) == "zmq+") { - inputZeroMQReader.Open(inputName.substr(4)); - } - else { - inputZeroMQReader.Open(inputName); - } - inputReader = &inputZeroMQReader; + inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + m.inputReader = inputZeroMQReader.get(); #endif } else { fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); ret = -1; - goto END_MAIN; + throw std::runtime_error("Unable to open input"); } if (useFileOutput) { if (fileOutputFormat == "complexf") { - output = new OutputFile(outputName); + output = make_shared<OutputFile>(outputName); } else if (fileOutputFormat == "s8") { // We must normalise the samples to the interval [-127.0; 127.0] normalise = 127.0f / normalise_factor; - format_converter = new FormatConverter(); + format_converter = make_shared<FormatConverter>(); - output = new OutputFile(outputName); + output = make_shared<OutputFile>(outputName); } } #if defined(HAVE_OUTPUT_UHD) else if (useUHDOutput) { - normalise = 1.0f / normalise_factor; - outputuhd_conf.sampleRate = outputRate; - try { - output = new OutputUHD(outputuhd_conf, logger); - ((OutputUHD*)output)->enrol_at(rcs); - } - catch (std::exception& e) { - logger.level(error) << "UHD initialisation failed:" << e.what(); - goto END_MAIN; - } + output = make_shared<OutputUHD>(outputuhd_conf); + ((OutputUHD*)output.get())->enrol_at(rcs); } #endif #if defined(HAVE_ZEROMQ) else if (useZeroMQOutput) { /* We normalise the same way as for the UHD output */ normalise = 1.0f / normalise_factor; - - output = new OutputZeroMQ(outputName); + if (zmqOutputSocketType == "pub") { + output = make_shared<OutputZeroMQ>(outputName, ZMQ_PUB); + } + else if (zmqOutputSocketType == "rep") { + output = make_shared<OutputZeroMQ>(outputName, ZMQ_REP); + } + else { + std::stringstream ss; + ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid"; + throw std::invalid_argument(ss.str()); + } } #endif - flowgraph = new Flowgraph(); - data.setLength(6144); - input = new InputMemory(&data); - modulator = new DabModulator(modconf, &rcs, logger, outputRate, clockRate, - dabMode, gainMode, digitalgain, normalise, filterTapsFilename); - flowgraph->connect(input, modulator); - if (format_converter) { - flowgraph->connect(modulator, format_converter); - flowgraph->connect(format_converter, output); - } - else { - flowgraph->connect(modulator, output); - } + + while (run_again) { + Flowgraph flowgraph; + + m.flowgraph = &flowgraph; + m.data.setLength(6144); + + shared_ptr<InputMemory> input(new InputMemory(&m.data)); + shared_ptr<DabModulator> modulator( + new DabModulator(tist_offset_s, tist_delay_stages, &rcs, + outputRate, clockRate, dabMode, gainMode, digitalgain, + normalise, filterTapsFilename)); + + flowgraph.connect(input, modulator); + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); + } + else { + flowgraph.connect(modulator, output); + } #if defined(HAVE_OUTPUT_UHD) - if (useUHDOutput) { - ((OutputUHD*)output)->setETIReader(modulator->getEtiReader()); - } + if (useUHDOutput) { + ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); + } +#endif + + m.inputReader->PrintInfo(); + + run_modulator_state st = run_modulator(m); + + switch (st) { + case MOD_FAILURE: + etiLog.level(error) << "Modulator failure."; + run_again = false; + ret = 1; + break; +#if defined(HAVE_ZEROMQ) + case MOD_AGAIN: + etiLog.level(warn) << "Restart modulator."; + running = true; + if (inputTransport == "zeromq") { + run_again = true; + + // Create a new input reader + inputZeroMQReader = make_shared<InputZeroMQReader>(); + inputZeroMQReader->Open(inputName, inputMaxFramesQueued); + m.inputReader = inputZeroMQReader.get(); + } + break; #endif + case MOD_NORMAL_END: + default: + etiLog.level(info) << "modulator stopped."; + ret = 0; + run_again = false; + break; + } - inputReader->PrintInfo(); + fprintf(stderr, "\n\n"); + etiLog.level(info) << m.framecount << " DAB frames encoded"; + etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; + + m.data.setLength(0); + } + + //////////////////////////////////////////////////////////////////////// + // Cleaning things + //////////////////////////////////////////////////////////////////////// + etiLog.level(info) << "Terminating"; + return ret; +} + +run_modulator_state run_modulator(modulator_data& m) +{ + run_modulator_state ret = MOD_FAILURE; try { while (running) { @@ -846,57 +825,68 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); - while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { + while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { if (!running) { break; } - frame++; + m.framecount++; PDEBUG("*****************************************\n"); - PDEBUG("* Read frame %lu\n", frame); + PDEBUG("* Read frame %lu\n", m.framecount); PDEBUG("*****************************************\n"); //////////////////////////////////////////////////////////////// - // Proccessing data + // Processing data //////////////////////////////////////////////////////////////// - flowgraph->run(); + m.flowgraph->run(); /* Check every once in a while if the remote control * is still working */ - if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { - rcs.check_faults(); + if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) { + m.rcs->check_faults(); } } if (framesize == 0) { - fprintf(stderr, "End of file reached.\n"); + etiLog.level(info) << "End of file reached."; } else { - fprintf(stderr, "Input read error.\n"); + etiLog.level(error) << "Input read error."; } running = 0; + ret = MOD_NORMAL_END; } +#if defined(HAVE_OUTPUT_UHD) + } catch (fct_discontinuity_error& e) { + // The OutputUHD saw a FCT discontinuity + etiLog.level(warn) << e.what(); + ret = MOD_AGAIN; +#endif + } catch (zmq_input_overflow& e) { + // The ZeroMQ input has overflowed its buffer + etiLog.level(warn) << e.what(); + ret = MOD_AGAIN; } catch (std::exception& e) { - fprintf(stderr, "EXCEPTION: %s\n", e.what()); - ret = -1; + etiLog.level(error) << "Exception caught: " << e.what(); + ret = MOD_FAILURE; } -END_MAIN: - //////////////////////////////////////////////////////////////////////// - // Cleaning things - //////////////////////////////////////////////////////////////////////// - fprintf(stderr, "\n\n"); - fprintf(stderr, "%lu DAB frames encoded\n", frame); - fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f); - - fprintf(stderr, "\nCleaning flowgraph...\n"); - delete flowgraph; - - // Cif - fprintf(stderr, "\nCleaning buffers...\n"); - - logger.level(info) << "Terminating"; - return ret; } +int main(int argc, char* argv[]) +{ + try { + return launch_modulator(argc, argv); + } + catch (std::invalid_argument& e) { + std::string what(e.what()); + if (not what.empty()) { + std::cerr << "Modulator error: " << what << std::endl; + } + } + catch (std::runtime_error& e) { + std::cerr << "Modulator runtime error: " << e.what() << std::endl; + } +} + diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 2664a08..35ef7cb 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -3,8 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Includes modifications for which no copyright is claimed - 2012, Matthias P. Braendli, matthias.braendli@mpb.li + Copyright (C) 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -50,25 +52,24 @@ #include "RemoteControl.h" #include "Log.h" +using namespace boost; DabModulator::DabModulator( - struct modulator_offset_config& modconf, + double tist_offset_s, unsigned tist_delay_stages, RemoteControllers* rcs, - Logger& logger, unsigned outputRate, unsigned clockRate, unsigned dabMode, GainMode gainMode, float digGain, float normalise, std::string filterTapsFilename ) : ModCodec(ModFormat(1), ModFormat(0)), - myLogger(logger), myOutputRate(outputRate), myClockRate(clockRate), myDabMode(dabMode), myGainMode(gainMode), myDigGain(digGain), myNormalise(normalise), - myEtiReader(EtiReader(modconf, myLogger)), + myEtiReader(EtiReader(tist_offset_s, tist_delay_stages, rcs)), myFlowgraph(NULL), myFilterTapsFilename(filterTapsFilename), myRCs(rcs) @@ -155,62 +156,65 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) //////////////////////////////////////////////////////////////// // CIF data initialisation //////////////////////////////////////////////////////////////// - FrameMultiplexer* cifMux = NULL; - PrbsGenerator* cifPrbs = NULL; - BlockPartitioner* cifPart = NULL; - QpskSymbolMapper* cifMap = NULL; - FrequencyInterleaver* cifFreq = NULL; - PhaseReference* cifRef = NULL; - DifferentialModulator* cifDiff = NULL; - NullSymbol* cifNull = NULL; - SignalMultiplexer* cifSig = NULL; - CicEqualizer* cifCicEq = NULL; - OfdmGenerator* cifOfdm = NULL; - GainControl* cifGain = NULL; - GuardIntervalInserter* cifGuard = NULL; - FIRFilter* cifFilter = NULL; - Resampler* cifRes = NULL; - - cifPrbs = new PrbsGenerator(864 * 8, 0x110); - cifMux = new FrameMultiplexer(myFicSizeOut + 864 * 8, - &myEtiReader.getSubchannels()); - cifPart = new BlockPartitioner(mode, myEtiReader.getFp()); - cifMap = new QpskSymbolMapper(myNbCarriers); - cifRef = new PhaseReference(mode); - cifFreq = new FrequencyInterleaver(mode); - cifDiff = new DifferentialModulator(myNbCarriers); - cifNull = new NullSymbol(myNbCarriers); - cifSig = new SignalMultiplexer( - (1 + myNbSymbols) * myNbCarriers * sizeof(complexf)); - + shared_ptr<PrbsGenerator> cifPrbs(new PrbsGenerator(864 * 8, 0x110)); + shared_ptr<FrameMultiplexer> cifMux( + new FrameMultiplexer(myFicSizeOut + 864 * 8, + &myEtiReader.getSubchannels())); + + shared_ptr<BlockPartitioner> cifPart( + new BlockPartitioner(mode, myEtiReader.getFp())); + + shared_ptr<QpskSymbolMapper> cifMap(new QpskSymbolMapper(myNbCarriers)); + shared_ptr<PhaseReference> cifRef(new PhaseReference(mode)); + shared_ptr<FrequencyInterleaver> cifFreq(new FrequencyInterleaver(mode)); + shared_ptr<DifferentialModulator> cifDiff( + new DifferentialModulator(myNbCarriers)); + + shared_ptr<NullSymbol> cifNull(new NullSymbol(myNbCarriers)); + shared_ptr<SignalMultiplexer> cifSig(new SignalMultiplexer( + (1 + myNbSymbols) * myNbCarriers * sizeof(complexf))); + + // TODO this needs a review + bool useCicEq = false; + unsigned cic_ratio = 1; if (myClockRate) { - unsigned ratio = myClockRate / myOutputRate; - ratio /= 4; // FPGA DUC + cic_ratio = myClockRate / myOutputRate; + cic_ratio /= 4; // FPGA DUC if (myClockRate == 400000000) { // USRP2 - if (ratio & 1) { // odd - cifCicEq = new CicEqualizer(myNbCarriers, - (float)mySpacing * (float)myOutputRate / 2048000.0f, - ratio); + if (cic_ratio & 1) { // odd + useCicEq = true; } // even, no filter - } else { - cifCicEq = new CicEqualizer(myNbCarriers, - (float)mySpacing * (float)myOutputRate / 2048000.0f, - ratio); + } + else { + useCicEq = true; } } - cifOfdm = new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing); - cifGain = new GainControl(mySpacing, myGainMode, myDigGain, myNormalise); + shared_ptr<CicEqualizer> cifCicEq(new CicEqualizer(myNbCarriers, + (float)mySpacing * (float)myOutputRate / 2048000.0f, + cic_ratio)); + + + shared_ptr<OfdmGenerator> cifOfdm( + new OfdmGenerator((1 + myNbSymbols), myNbCarriers, mySpacing)); + + shared_ptr<GainControl> cifGain( + new GainControl(mySpacing, myGainMode, myDigGain, myNormalise)); + cifGain->enrol_at(*myRCs); - cifGuard = new GuardIntervalInserter(myNbSymbols, mySpacing, - myNullSize, mySymSize); + shared_ptr<GuardIntervalInserter> cifGuard( + new GuardIntervalInserter(myNbSymbols, mySpacing, + myNullSize, mySymSize)); + + FIRFilter* cifFilter = NULL; if (myFilterTapsFilename != "") { cifFilter = new FIRFilter(myFilterTapsFilename); cifFilter->enrol_at(*myRCs); } - myOutput = new OutputMemory(); + shared_ptr<OutputMemory> myOutput(new OutputMemory(dataOut)); + Resampler* cifRes = NULL; if (myOutputRate != 2048000) { cifRes = new Resampler(2048000, myOutputRate, mySpacing); } else { @@ -222,10 +226,7 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) //////////////////////////////////////////////////////////////// // Processing FIC //////////////////////////////////////////////////////////////// - FicSource* fic = myEtiReader.getFic(); - PrbsGenerator* ficPrbs = NULL; - ConvEncoder* ficConv = NULL; - PuncturingEncoder* ficPunc = NULL; + shared_ptr<FicSource> fic(myEtiReader.getFic()); //////////////////////////////////////////////////////////////// // Data initialisation //////////////////////////////////////////////////////////////// @@ -241,13 +242,13 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) PDEBUG(" Framesize: %zu\n", fic->getFramesize()); // Configuring prbs generator - ficPrbs = new PrbsGenerator(myFicSizeIn, 0x110); + shared_ptr<PrbsGenerator> ficPrbs(new PrbsGenerator(myFicSizeIn, 0x110)); // Configuring convolutionnal encoder - ficConv = new ConvEncoder(myFicSizeIn); + shared_ptr<ConvEncoder> ficConv(new ConvEncoder(myFicSizeIn)); // Configuring puncturing encoder - ficPunc = new PuncturingEncoder(); + shared_ptr<PuncturingEncoder> ficPunc(new PuncturingEncoder()); std::vector<PuncturingRule*> rules = fic->get_rules(); std::vector<PuncturingRule*>::const_iterator rule; for (rule = rules.begin(); rule != rules.end(); ++rule) { @@ -267,16 +268,12 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) //////////////////////////////////////////////////////////////// // Configuring subchannels //////////////////////////////////////////////////////////////// - std::vector<SubchannelSource*> subchannels = + std::vector<shared_ptr<SubchannelSource> > subchannels = myEtiReader.getSubchannels(); - std::vector<SubchannelSource*>::const_iterator subchannel; + std::vector<shared_ptr<SubchannelSource> >::const_iterator subchannel; for (subchannel = subchannels.begin(); subchannel != subchannels.end(); ++subchannel) { - PrbsGenerator* subchPrbs = NULL; - ConvEncoder* subchConv = NULL; - PuncturingEncoder* subchPunc = NULL; - TimeInterleaver* subchInterleaver = NULL; //////////////////////////////////////////////////////////// // Data initialisation @@ -307,13 +304,17 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) (*subchannel)->protectionOption()); // Configuring prbs genrerator - subchPrbs = new PrbsGenerator(subchSizeIn, 0x110); + shared_ptr<PrbsGenerator> subchPrbs( + new PrbsGenerator(subchSizeIn, 0x110)); // Configuring convolutionnal encoder - subchConv = new ConvEncoder(subchSizeIn); + shared_ptr<ConvEncoder> subchConv( + new ConvEncoder(subchSizeIn)); // Configuring puncturing encoder - subchPunc = new PuncturingEncoder(); + shared_ptr<PuncturingEncoder> subchPunc( + new PuncturingEncoder()); + std::vector<PuncturingRule*> rules = (*subchannel)->get_rules(); std::vector<PuncturingRule*>::const_iterator rule; for (rule = rules.begin(); rule != rules.end(); ++rule) { @@ -326,7 +327,8 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) subchPunc->append_tail_rule(PuncturingRule(3, 0xcccccc)); // Configuring time interleaver - subchInterleaver = new TimeInterleaver(subchSizeOut); + shared_ptr<TimeInterleaver> subchInterleaver( + new TimeInterleaver(subchSizeOut)); myFlowgraph->connect(*subchannel, subchPrbs); myFlowgraph->connect(subchPrbs, subchConv); @@ -342,7 +344,7 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) myFlowgraph->connect(cifFreq, cifDiff); myFlowgraph->connect(cifNull, cifSig); myFlowgraph->connect(cifDiff, cifSig); - if (myClockRate) { + if (useCicEq) { myFlowgraph->connect(cifSig, cifCicEq); myFlowgraph->connect(cifCicEq, cifOfdm); } else { @@ -352,18 +354,21 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) myFlowgraph->connect(cifGain, cifGuard); if (myFilterTapsFilename != "") { - myFlowgraph->connect(cifGuard, cifFilter); + shared_ptr<FIRFilter> cifFilterptr(cifFilter); + myFlowgraph->connect(cifGuard, cifFilterptr); if (cifRes != NULL) { - myFlowgraph->connect(cifFilter, cifRes); - myFlowgraph->connect(cifRes, myOutput); + shared_ptr<Resampler> res(cifRes); + myFlowgraph->connect(cifFilterptr, res); + myFlowgraph->connect(res, myOutput); } else { - myFlowgraph->connect(cifFilter, myOutput); + myFlowgraph->connect(cifFilterptr, myOutput); } } else { //no filtering if (cifRes != NULL) { - myFlowgraph->connect(cifGuard, cifRes); - myFlowgraph->connect(cifRes, myOutput); + shared_ptr<Resampler> res(cifRes); + myFlowgraph->connect(cifGuard, res); + myFlowgraph->connect(res, myOutput); } else { myFlowgraph->connect(cifGuard, myOutput); } @@ -374,6 +379,6 @@ int DabModulator::process(Buffer* const dataIn, Buffer* dataOut) //////////////////////////////////////////////////////////////////// // Proccessing data //////////////////////////////////////////////////////////////////// - myOutput->setOutput(dataOut); return myFlowgraph->run(); } + diff --git a/src/DabModulator.h b/src/DabModulator.h index 84c9926..1a9e477 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -3,8 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Includes modifications for which no copyright is claimed - 2012, Matthias P. Braendli, matthias.braendli@mpb.li + Copyright (C) 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -32,6 +34,7 @@ #include <sys/types.h> #include <string> +#include <boost/shared_ptr.hpp> #include "ModCodec.h" #include "EtiReader.h" @@ -46,9 +49,8 @@ class DabModulator : public ModCodec { public: DabModulator( - struct modulator_offset_config& modconf, + double tist_offset_s, unsigned tist_delay_stages, RemoteControllers* rcs, - Logger& logger, unsigned outputRate = 2048000, unsigned clockRate = 0, unsigned dabMode = 0, GainMode gainMode = GAIN_VAR, float digGain = 1.0, float normalise = 1.0, @@ -63,8 +65,6 @@ public: EtiReader* getEtiReader() { return &myEtiReader; } protected: - Logger& myLogger; - void setMode(unsigned mode); unsigned myOutputRate; @@ -88,5 +88,5 @@ protected: size_t myFicSizeIn; }; - #endif // DAB_MODULATOR_H + diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index fe54f55..f584275 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -34,6 +34,7 @@ #include <string.h> #include <arpa/inet.h> +using namespace boost; enum ETI_READER_STATE { EtiReaderStateNbFrame, @@ -50,16 +51,20 @@ enum ETI_READER_STATE { }; -EtiReader::EtiReader(struct modulator_offset_config& modconf, - Logger& logger) : - myLogger(logger), +EtiReader::EtiReader( + double tist_offset_s, + unsigned tist_delay_stages, + RemoteControllers* rcs) : state(EtiReaderStateSync), myFicSource(NULL), - myTimestampDecoder(modconf, myLogger) + myTimestampDecoder(tist_offset_s, tist_delay_stages) { PDEBUG("EtiReader::EtiReader()\n"); + myTimestampDecoder.enrol_at(*rcs); + myCurrentFrame = 0; + eti_fc_valid = false; } EtiReader::~EtiReader() @@ -69,9 +74,6 @@ EtiReader::~EtiReader() // if (myFicSource != NULL) { // delete myFicSource; // } -// for (unsigned i = 0; i < mySources.size(); ++i) { -// delete mySources[i]; -// } } @@ -83,23 +85,29 @@ FicSource* EtiReader::getFic() unsigned EtiReader::getMode() { + if (not eti_fc_valid) { + throw std::runtime_error("Trying to access Mode before it is ready!"); + } return eti_fc.MID; } unsigned EtiReader::getFp() { + if (not eti_fc_valid) { + throw std::runtime_error("Trying to access FP before it is ready!"); + } return eti_fc.FP; } -const std::vector<SubchannelSource*>& EtiReader::getSubchannels() +const std::vector<boost::shared_ptr<SubchannelSource> >& EtiReader::getSubchannels() { return mySources; } -int EtiReader::process(Buffer* dataIn) +int EtiReader::process(const Buffer* dataIn) { PDEBUG("EtiReader::process(dataIn: %p)\n", dataIn); PDEBUG(" state: %u\n", state); @@ -146,6 +154,7 @@ int EtiReader::process(Buffer* dataIn) return dataIn->getLength() - input_size; } memcpy(&eti_fc, in, 4); + eti_fc_valid = true; input_size -= 4; framesize -= 4; in += 4; @@ -171,13 +180,12 @@ int EtiReader::process(Buffer* dataIn) (memcmp(&eti_stc[0], in, 4 * eti_fc.NST))) { PDEBUG("New stc!\n"); eti_stc.resize(eti_fc.NST); - for (unsigned i = 0; i < mySources.size(); ++i) { - delete mySources[i]; - } - mySources.resize(eti_fc.NST); memcpy(&eti_stc[0], in, 4 * eti_fc.NST); + + mySources.clear(); for (unsigned i = 0; i < eti_fc.NST; ++i) { - mySources[i] = new SubchannelSource(eti_stc[i]); + mySources.push_back(shared_ptr<SubchannelSource>( + new SubchannelSource(eti_stc[i]))); PDEBUG("Sstc %u:\n", i); PDEBUG(" Stc%i.scid: %i\n", i, eti_stc[i].SCID); PDEBUG(" Stc%i.sad: %u\n", i, eti_stc[i].getStartAddress()); @@ -281,11 +289,6 @@ int EtiReader::process(Buffer* dataIn) myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3, eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT); - if (eti_fc.FCT % 125 == 0) //every 3 seconds is fine enough - { - myTimestampDecoder.updateModulatorOffset(); - } - return dataIn->getLength() - input_size; } diff --git a/src/EtiReader.h b/src/EtiReader.h index 209b208..84ad9b4 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -41,12 +41,16 @@ #include <vector> #include <stdint.h> #include <sys/types.h> +#include <boost/shared_ptr.hpp> class EtiReader { public: - EtiReader(struct modulator_offset_config& modconf, Logger& logger); + EtiReader( + double tist_offset_s, + unsigned tist_delay_stages, + RemoteControllers* rcs); virtual ~EtiReader(); EtiReader(const EtiReader&); EtiReader& operator=(const EtiReader&); @@ -54,8 +58,8 @@ public: FicSource* getFic(); unsigned getMode(); unsigned getFp(); - const std::vector<SubchannelSource*>& getSubchannels(); - int process(Buffer* dataIn); + const std::vector<boost::shared_ptr<SubchannelSource> >& getSubchannels(); + int process(const Buffer* dataIn); void calculateTimestamp(struct frame_timestamp& ts) { @@ -66,9 +70,6 @@ public: bool sourceContainsTimestamp(); protected: - /* Main program logger */ - Logger& myLogger; - /* Transform the ETI TIST to a PPS offset in ms */ double getPPSOffset(); @@ -83,14 +84,14 @@ protected: eti_EOF eti_eof; eti_TIST eti_tist; FicSource* myFicSource; - std::vector<SubchannelSource*> mySources; + std::vector<boost::shared_ptr<SubchannelSource> > mySources; TimestampDecoder myTimestampDecoder; - + private: size_t myCurrentFrame; - bool time_ext_enabled; - unsigned long timestamp_seconds; + bool eti_fc_valid; }; #endif // ETI_READER_H + diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp index 805c6d2..b1ce618 100644 --- a/src/FIRFilter.cpp +++ b/src/FIRFilter.cpp @@ -36,6 +36,8 @@ #include <iostream> #include <fstream> +#include <boost/make_shared.hpp> + #ifdef __AVX__ # include <immintrin.h> #else @@ -58,11 +60,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) // the incoming buffer while(running) { - Buffer* dataIn; + boost::shared_ptr<Buffer> dataIn; fwd->input_queue.wait_and_pop(dataIn); - Buffer* dataOut; - dataOut = new Buffer(); + boost::shared_ptr<Buffer> dataOut = boost::make_shared<Buffer>(); dataOut->setLength(dataIn->getLength()); PDEBUG("FIRFilterWorker: dataIn->getLength() %zu\n", dataIn->getLength()); @@ -91,7 +92,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) fprintf(stderr, "FIRFilterWorker: out not aligned %p ", out); throw std::runtime_error("FIRFilterWorker: out not aligned"); } - + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start); __m256 AVXout; @@ -141,7 +142,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) fprintf(stderr, "FIRFilterWorker: out not aligned %p ", out); throw std::runtime_error("FIRFilterWorker: out not aligned"); } - + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start); __m128 SSEout; @@ -290,11 +291,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) } } #endif - + calculationTime += (time_end.tv_sec - time_start.tv_sec) * 1000000000L + time_end.tv_nsec - time_start.tv_nsec; fwd->output_queue.push(dataOut); - delete dataIn; } } @@ -393,17 +393,16 @@ int FIRFilter::process(Buffer* const dataIn, Buffer* dataOut) // This thread creates the dataIn buffer, and deletes // the outgoing buffer - Buffer* inbuffer = new Buffer(dataIn->getLength(), dataIn->getData()); + boost::shared_ptr<Buffer> inbuffer = + boost::make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); firwd.input_queue.push(inbuffer); if (number_of_runs > 2) { - Buffer* outbuffer; + boost::shared_ptr<Buffer> outbuffer; firwd.output_queue.wait_and_pop(outbuffer); dataOut->setData(outbuffer->getData(), outbuffer->getLength()); - - delete outbuffer; } else { dataOut->setLength(dataIn->getLength()); diff --git a/src/FIRFilter.h b/src/FIRFilter.h index 0ecae3e..751be91 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -30,7 +30,7 @@ #endif #include <boost/thread.hpp> -#include "ThreadsafeQueue.h" +#include <boost/shared_ptr.hpp> #include "RemoteControl.h" #include "ModCodec.h" @@ -52,8 +52,8 @@ struct FIRFilterWorkerData { /* Thread-safe queues to give data to and get data from * the worker */ - ThreadsafeQueue<Buffer*> input_queue; - ThreadsafeQueue<Buffer*> output_queue; + ThreadsafeQueue<boost::shared_ptr<Buffer> > input_queue; + ThreadsafeQueue<boost::shared_ptr<Buffer> > output_queue; /* Remote-control can change the taps while the filter * runs. This lock makes sure nothing bad happens when @@ -127,5 +127,5 @@ protected: struct FIRFilterWorkerData firwd; }; - #endif //FIRFILTER_H + diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index dd9c68b..3844e86 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -1,6 +1,11 @@ /* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -38,16 +43,18 @@ #include <sys/time.h> #endif +using namespace boost; -typedef std::vector<Node*>::iterator NodeIterator; -typedef std::vector<Edge*>::iterator EdgeIterator; +typedef std::vector<shared_ptr<Node> >::iterator NodeIterator; +typedef std::vector<shared_ptr<Edge> >::iterator EdgeIterator; -Node::Node(ModPlugin* plugin) : +Node::Node(shared_ptr<ModPlugin> plugin) : myPlugin(plugin), myProcessTime(0) { - PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", plugin->name(), plugin, this); + PDEBUG("Node::Node(plugin(%s): %p) @ %p\n", + plugin->name(), plugin.get(), this); } @@ -56,24 +63,21 @@ Node::~Node() { PDEBUG("Node::~Node() @ %p\n", this); - if (myPlugin != NULL) { - delete myPlugin; - } assert(myInputBuffers.size() == 0); assert(myOutputBuffers.size() == 0); } -Edge::Edge(Node* srcNode, Node* dstNode) : +Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) : mySrcNode(srcNode), myDstNode(dstNode) { PDEBUG("Edge::Edge(srcNode(%s): %p, dstNode(%s): %p) @ %p\n", - srcNode->plugin()->name(), srcNode, - dstNode->plugin()->name(), dstNode, + srcNode->plugin()->name(), srcNode.get(), + dstNode->plugin()->name(), dstNode.get(), this); - myBuffer = new Buffer(); + myBuffer = shared_ptr<Buffer>(new Buffer()); srcNode->myOutputBuffers.push_back(myBuffer); dstNode->myInputBuffers.push_back(myBuffer); } @@ -83,7 +87,7 @@ Edge::~Edge() { PDEBUG("Edge::~Edge() @ %p\n", this); - std::vector<Buffer*>::iterator buffer; + std::vector<shared_ptr<Buffer> >::iterator buffer; if (myBuffer != NULL) { for (buffer = mySrcNode->myOutputBuffers.begin(); buffer != mySrcNode->myOutputBuffers.end(); @@ -102,7 +106,6 @@ Edge::~Edge() break; } } - delete myBuffer; } } @@ -110,9 +113,26 @@ Edge::~Edge() int Node::process() { PDEBUG("Edge::process()\n"); - PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin); + PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin.get()); + + // the plugin process() still wants vector<Buffer*> + // arguments. + std::vector<Buffer*> inBuffers; + std::vector<shared_ptr<Buffer> >::iterator buffer; + for (buffer = myInputBuffers.begin(); + buffer != myInputBuffers.end(); + ++buffer) { + inBuffers.push_back(buffer->get()); + } - return myPlugin->process(myInputBuffers, myOutputBuffers); + std::vector<Buffer*> outBuffers; + for (buffer = myOutputBuffers.begin(); + buffer != myOutputBuffers.end(); + ++buffer) { + outBuffers.push_back(buffer->get()); + } + + return myPlugin->process(inBuffers, outBuffers); } @@ -128,35 +148,26 @@ Flowgraph::~Flowgraph() { PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this); - std::vector<Edge*>::const_iterator edge; - for (edge = edges.begin(); edge != edges.end(); ++edge) { - delete *edge; - } - if (myProcessTime) { fprintf(stderr, "Process time:\n"); - } - std::vector<Node*>::const_iterator node; - for (node = nodes.begin(); node != nodes.end(); ++node) { - if (myProcessTime) { + + std::vector<shared_ptr<Node> >::const_iterator node; + for (node = nodes.begin(); node != nodes.end(); ++node) { fprintf(stderr, " %30s: %10u us (%2.2f %%)\n", (*node)->plugin()->name(), (unsigned)(*node)->processTime(), (*node)->processTime() * 100.0 / myProcessTime); } - delete *node; - } - if (myProcessTime) { + fprintf(stderr, " %30s: %10u us (100.00 %%)\n", "total", (unsigned)myProcessTime); } } - -void Flowgraph::connect(ModPlugin* input, ModPlugin* output) +void Flowgraph::connect(shared_ptr<ModPlugin> input, shared_ptr<ModPlugin> output) { PDEBUG("Flowgraph::connect(input(%s): %p, output(%s): %p)\n", - input->name(), input, output->name(), output); + input->name(), input.get(), output->name(), output.get()); NodeIterator inputNode; NodeIterator outputNode; @@ -167,7 +178,7 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output) } } if (inputNode == nodes.end()) { - inputNode = nodes.insert(nodes.end(), new Node(input)); + inputNode = nodes.insert(nodes.end(), shared_ptr<Node>(new Node(input))); } for (outputNode = nodes.begin(); outputNode != nodes.end(); ++outputNode) { @@ -176,14 +187,14 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output) } } if (outputNode == nodes.end()) { - outputNode = nodes.insert(nodes.end(), new Node(output)); + outputNode = nodes.insert(nodes.end(), shared_ptr<Node>(new Node(output))); for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) { if ((*inputNode)->plugin() == input) { break; } } } else if (inputNode > outputNode) { - Node* node = *outputNode; + shared_ptr<Node> node = *outputNode; nodes.erase(outputNode); outputNode = nodes.insert(nodes.end(), node); for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) { @@ -196,7 +207,7 @@ void Flowgraph::connect(ModPlugin* input, ModPlugin* output) assert((*inputNode)->plugin() == input); assert((*outputNode)->plugin() == output); - edges.push_back(new Edge(*inputNode, *outputNode)); + edges.push_back(shared_ptr<Edge>(new Edge(*inputNode, *outputNode))); } @@ -204,7 +215,7 @@ bool Flowgraph::run() { PDEBUG("Flowgraph::run()\n"); - std::vector<Node*>::const_iterator node; + std::vector<shared_ptr<Node> >::const_iterator node; timeval start, stop; time_t diff; @@ -224,3 +235,4 @@ bool Flowgraph::run() } return true; } + diff --git a/src/Flowgraph.h b/src/Flowgraph.h index 178b6a9..1129668 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -1,6 +1,11 @@ /* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -32,20 +37,21 @@ #include <sys/types.h> #include <vector> +#include <boost/shared_ptr.hpp> class Node { public: - Node(ModPlugin* plugin); + Node(boost::shared_ptr<ModPlugin> plugin); ~Node(); Node(const Node&); Node& operator=(const Node&); - ModPlugin* plugin() { return myPlugin; } + boost::shared_ptr<ModPlugin> plugin() { return myPlugin; } - std::vector<Buffer*> myInputBuffers; - std::vector<Buffer*> myOutputBuffers; + std::vector<boost::shared_ptr<Buffer> > myInputBuffers; + std::vector<boost::shared_ptr<Buffer> > myOutputBuffers; int process(); time_t processTime() { return myProcessTime; } @@ -54,7 +60,7 @@ public: } protected: - ModPlugin* myPlugin; + boost::shared_ptr<ModPlugin> myPlugin; time_t myProcessTime; }; @@ -62,15 +68,15 @@ protected: class Edge { public: - Edge(Node* src, Node* dst); + Edge(boost::shared_ptr<Node>& src, boost::shared_ptr<Node>& dst); ~Edge(); Edge(const Edge&); Edge& operator=(const Edge&); protected: - Node* mySrcNode; - Node* myDstNode; - Buffer* myBuffer; + boost::shared_ptr<Node> mySrcNode; + boost::shared_ptr<Node> myDstNode; + boost::shared_ptr<Buffer> myBuffer; }; @@ -82,14 +88,16 @@ public: Flowgraph(const Flowgraph&); Flowgraph& operator=(const Flowgraph&); - void connect(ModPlugin* input, ModPlugin* output); + void connect(boost::shared_ptr<ModPlugin> input, + boost::shared_ptr<ModPlugin> output); bool run(); protected: - std::vector<Node*> nodes; - std::vector<Edge*> edges; + std::vector<boost::shared_ptr<Node> > nodes; + std::vector<boost::shared_ptr<Edge> > edges; time_t myProcessTime; }; #endif // FLOWGRAPH_H + diff --git a/src/FrameMultiplexer.cpp b/src/FrameMultiplexer.cpp index c5e58b7..843f72d 100644 --- a/src/FrameMultiplexer.cpp +++ b/src/FrameMultiplexer.cpp @@ -30,8 +30,11 @@ typedef std::complex<float> complexf; +using namespace boost; -FrameMultiplexer::FrameMultiplexer(size_t framesize, const std::vector<SubchannelSource*>* subchannels) : +FrameMultiplexer::FrameMultiplexer( + size_t framesize, + const std::vector<shared_ptr<SubchannelSource> >* subchannels) : ModMux(ModFormat(framesize), ModFormat(framesize)), d_frameSize(framesize), mySubchannels(subchannels) @@ -76,7 +79,7 @@ int FrameMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut) ++in; // Write subchannel assert(mySubchannels->size() == dataIn.size() - 1); - std::vector<SubchannelSource*>::const_iterator subchannel = + std::vector<shared_ptr<SubchannelSource> >::const_iterator subchannel = mySubchannels->begin(); while (in != dataIn.end()) { assert((*subchannel)->framesizeCu() * 8 == (*in)->getLength()); @@ -88,3 +91,4 @@ int FrameMultiplexer::process(std::vector<Buffer*> dataIn, Buffer* dataOut) return dataOut->getLength(); } + diff --git a/src/FrameMultiplexer.h b/src/FrameMultiplexer.h index f1bd587..ba571f6 100644 --- a/src/FrameMultiplexer.h +++ b/src/FrameMultiplexer.h @@ -29,7 +29,7 @@ #include "ModMux.h" #include "SubchannelSource.h" - +#include <boost/shared_ptr.hpp> #include <sys/types.h> @@ -37,7 +37,8 @@ class FrameMultiplexer : public ModMux { public: - FrameMultiplexer(size_t frameSize, const std::vector<SubchannelSource*>* subchannels); + FrameMultiplexer(size_t frameSize, + const std::vector<boost::shared_ptr<SubchannelSource> >* subchannels); virtual ~FrameMultiplexer(); FrameMultiplexer(const FrameMultiplexer&); FrameMultiplexer& operator=(const FrameMultiplexer&); @@ -48,8 +49,8 @@ public: protected: size_t d_frameSize; - const std::vector<SubchannelSource*>* mySubchannels; + const std::vector<boost::shared_ptr<SubchannelSource> >* mySubchannels; }; - #endif // FRAME_MULTIPLEXER_H + diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 205fbfa..84f0be4 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -49,8 +49,7 @@ int InputFileReader::Open(std::string filename, bool loop) loop_ = loop; inputfile_ = fopen(filename_.c_str(), "r"); if (inputfile_ == NULL) { - fprintf(stderr, "Unable to open input file!\n"); - logger_.level(error) << "Unable to open input file!"; + etiLog.level(error) << "Unable to open input file!"; perror(filename_.c_str()); return -1; } @@ -79,8 +78,7 @@ int InputFileReader::IdentifyType() char discard_buffer[6144]; if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read sync in input file!\n"); - logger_.level(error) << "Unable to read sync in input file!"; + etiLog.level(error) << "Unable to read sync in input file!"; perror(filename_.c_str()); return -1; } @@ -96,8 +94,7 @@ int InputFileReader::IdentifyType() // if the seek fails, consume the rest of the frame if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read from input file!\n"); - logger_.level(error) << "Unable to read from input file!"; + etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); return -1; } @@ -108,8 +105,7 @@ int InputFileReader::IdentifyType() nbFrames = sync; if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read frame size in input file!\n"); - logger_.level(error) << "Unable to read frame size in input file!"; + etiLog.level(error) << "Unable to read frame size in input file!"; perror(filename_.c_str()); return -1; } @@ -130,8 +126,7 @@ int InputFileReader::IdentifyType() // if the seek fails, consume the rest of the frame if (fread(discard_buffer, frameSize - 4, 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read from input file!\n"); - logger_.level(error) << "Unable to read from input file!"; + etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); return -1; } @@ -141,8 +136,7 @@ int InputFileReader::IdentifyType() } if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read nb frame in input file!\n"); - logger_.level(error) << "Unable to read nb frame in input file!"; + etiLog.level(error) << "Unable to read nb frame in input file!"; perror(filename_.c_str()); return -1; } @@ -152,8 +146,7 @@ int InputFileReader::IdentifyType() // if the seek fails, consume the rest of the frame if (fread(discard_buffer, frameSize - 4, 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read from input file!\n"); - logger_.level(error) << "Unable to read from input file!"; + etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); return -1; } @@ -168,8 +161,7 @@ int InputFileReader::IdentifyType() sync >>= 8; sync &= 0xffffff; if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read from input file!\n"); - logger_.level(error) << "Unable to read from input file!"; + etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); return -1; } @@ -184,8 +176,7 @@ int InputFileReader::IdentifyType() if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) { if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_) != 1) { - fprintf(stderr, "Unable to read from input file!\n"); - logger_.level(error) << "Unable to read from input file!"; + etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); return -1; } @@ -195,8 +186,7 @@ int InputFileReader::IdentifyType() } } - fprintf(stderr, "Bad input file format!\n"); - logger_.level(error) << "Bad input file format!"; + etiLog.level(error) << "Bad input file format!"; return -1; } @@ -236,18 +226,18 @@ int InputFileReader::GetNextFrame(void* buffer) } else { if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { - logger_.level(error) << "Reached end of file."; + etiLog.level(error) << "Reached end of file."; if (loop_) { if (Rewind() == 0) { if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { PDEBUG("Error after rewinding file!\n"); - logger_.level(error) << "Error after rewinding file!"; + etiLog.level(error) << "Error after rewinding file!"; return -1; } } else { PDEBUG("Impossible to rewind file!\n"); - logger_.level(error) << "Impossible to rewind file!"; + etiLog.level(error) << "Impossible to rewind file!"; return -1; } } @@ -257,8 +247,7 @@ int InputFileReader::GetNextFrame(void* buffer) } } if (frameSize > 6144) { // there might be a better limit - logger_.level(error) << "Wrong frame size " << frameSize << " in ETI file!"; - fprintf(stderr, "Wrong frame size %u in ETI file!\n", frameSize); + etiLog.level(error) << "Wrong frame size " << frameSize << " in ETI file!"; return -1; } @@ -275,7 +264,7 @@ int InputFileReader::GetNextFrame(void* buffer) } else { PDEBUG("Impossible to rewind file!\n"); - logger_.level(error) << "Impossible to rewind file!"; + etiLog.level(error) << "Impossible to rewind file!"; return -1; } } @@ -285,12 +274,8 @@ int InputFileReader::GetNextFrame(void* buffer) // A short read of a frame (i.e. reading an incomplete frame) // is not tolerated. Input files must not contain incomplete frames if (read_bytes != 0) { - fprintf(stderr, - "Unable to read a complete frame of %u data bytes from input file!\n", - frameSize); - - perror(filename_.c_str()); - logger_.level(error) << "Unable to read from input file!"; + etiLog.level(error) << + "Unable to read a complete frame of " << frameSize << " data bytes from input file!"; return -1; } else { diff --git a/src/InputReader.h b/src/InputReader.h index 3e0dcab..b262cc9 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyrigth (C) 2013 + Copyrigth (C) 2013, 2015 Matthias P. Braendli, matthias.braendli@mpb.li */ /* @@ -31,6 +31,8 @@ #endif #include <cstdio> +#include <vector> +#include <boost/shared_ptr.hpp> #if defined(HAVE_ZEROMQ) # include "zmq.hpp" # include "ThreadsafeQueue.h" @@ -85,15 +87,15 @@ class InputReader class InputFileReader : public InputReader { public: - InputFileReader(Logger logger) : + InputFileReader() : streamtype_(ETI_STREAM_TYPE_NONE), - inputfile_(NULL), logger_(logger) {}; + inputfile_(NULL) { } ~InputFileReader() { - fprintf(stderr, "\nClosing input file...\n"); - if (inputfile_ != NULL) { + fprintf(stderr, "\nClosing input file...\n"); + fclose(inputfile_); } } @@ -113,6 +115,9 @@ class InputFileReader : public InputReader } private: + InputFileReader(const InputFileReader& other); + InputFileReader& operator=(const InputFileReader& other); + int IdentifyType(); // Rewind the file, and replay anew @@ -123,20 +128,30 @@ class InputFileReader : public InputReader std::string filename_; EtiStreamType streamtype_; FILE* inputfile_; - Logger logger_; size_t inputfilelength_; uint64_t nbframes_; // 64-bit because 32-bit overflow is // after 2**32 * 24ms ~= 3.3 years }; +struct zmq_input_overflow : public std::exception +{ + const char* what () const throw () + { + return "InputZMQ buffer overflow"; + } +}; + #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ struct InputZeroMQThreadData { - ThreadsafeQueue<uint8_t*> *in_messages; + ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > *in_messages; std::string uri; + unsigned max_queued_frames; + + bool running; }; class InputZeroMQWorker @@ -168,10 +183,10 @@ class InputZeroMQWorker class InputZeroMQReader : public InputReader { public: - InputZeroMQReader(Logger logger) : - logger_(logger), in_messages_(10) + InputZeroMQReader() { workerdata_.in_messages = &in_messages_; + workerdata_.running = false; } ~InputZeroMQReader() @@ -179,21 +194,22 @@ class InputZeroMQReader : public InputReader worker_.Stop(); } - int Open(std::string uri); + int Open(const std::string& uri, unsigned max_queued_frames); int GetNextFrame(void* buffer); void PrintInfo(); private: - InputZeroMQReader(const InputZeroMQReader& other) {} - Logger logger_; + InputZeroMQReader(const InputZeroMQReader& other); + InputZeroMQReader& operator=(const InputZeroMQReader& other); std::string uri_; InputZeroMQWorker worker_; - ThreadsafeQueue<uint8_t*> in_messages_; + ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > in_messages_; struct InputZeroMQThreadData workerdata_; }; #endif #endif + diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f7f5702..36d4e4b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2013, 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -37,12 +37,11 @@ #include <stdint.h> #include "zmq.hpp" #include <boost/thread/thread.hpp> +#include <boost/make_shared.hpp> #include "porting.h" #include "InputReader.h" #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 - #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. @@ -64,10 +63,18 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { - uri_ = uri; + // The URL might start with zmq+tcp:// + if (uri.substr(0, 4) == "zmq+") { + uri_ = uri.substr(4); + } + else { + uri_ = uri; + } + workerdata_.uri = uri; + workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread worker_.Start(&workerdata_); @@ -78,12 +85,25 @@ int InputZeroMQReader::GetNextFrame(void* buffer) { const size_t framesize = 6144; - uint8_t* incoming; - in_messages_.wait_and_pop(incoming); + boost::shared_ptr<std::vector<uint8_t> > incoming; - memcpy(buffer, incoming, framesize); + /* Do some prebuffering because reads will happen in bursts + * (4 ETI frames in TM1) and we should make sure that + * we can serve the data required for a full transmission frame. + */ + if (in_messages_.size() < 4) { + const size_t prebuffering = 10; + in_messages_.wait_and_pop(incoming, prebuffering); + } + else { + in_messages_.wait_and_pop(incoming); + } - delete incoming; + if (! workerdata_.running) { + throw zmq_input_overflow(); + } + + memcpy(buffer, &incoming->front(), framesize); return framesize; } @@ -123,18 +143,16 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } m_to_drop--; } - else if (queue_size < MAX_QUEUE_SIZE) { + else if (queue_size < workerdata->max_queued_frames) { if (buffer_full) { - fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", - queue_size); + etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements"; buffer_full = false; } zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); if (dab_msg->version != 1) { - fprintf(stderr, "ZeroMQ input: wrong packet version %d\n", - dab_msg->version); + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; } int offset = sizeof(dab_msg->version) + @@ -145,23 +163,20 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) if (dab_msg->buflen[i] <= 0 || dab_msg->buflen[i] > 6144) { - fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n", - i, dab_msg->buflen[i]); + etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << + dab_msg->buflen[i]; // TODO error handling } else { - uint8_t* buf = new uint8_t[6144]; + boost::shared_ptr<std::vector<uint8_t> > buf = + boost::make_shared<std::vector<uint8_t> >(6144, 0x55); const int framesize = dab_msg->buflen[i]; - memcpy(buf, + memcpy(&buf->front(), ((uint8_t*)incoming.data()) + offset, framesize); - // pad to 6144 bytes - memset(&((uint8_t*)buf)[framesize], - 0x55, 6144 - framesize); - offset += framesize; queue_size = workerdata->in_messages->push(buf); @@ -172,9 +187,10 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) workerdata->in_messages->notify(); if (!buffer_full) { - fprintf(stderr, "ZeroMQ buffer overfull !\n"); + etiLog.level(warn) << "ZeroMQ buffer overfull !"; buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -188,23 +204,28 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", - queue_size); + etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } catch (zmq::error_t& err) { - fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); + etiLog.level(error) << "ZeroMQ error in RecvProcess: '" << err.what() << "'"; + } + catch (std::exception& err) { } - fprintf(stderr, "ZeroMQ input worker terminated\n"); + etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } diff --git a/src/Makefile.am b/src/Makefile.am deleted file mode 100644 index f8ba7c2..0000000 --- a/src/Makefile.am +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the -# Queen in Right of Canada (Communications Research Center Canada) - -# This file is part of ODR-DabMod. -# -# ODR-DabMod is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# ODR-DabMod is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. - -if IS_GIT_REPO -GITVERSION_FLAGS = -DGITVERSION="\"`git describe`\"" -else -GITVERSION_FLAGS = -endif - -if HAVE_SSE -SIMD_CFLAGS = -msse -msse2 -else -SIMD_CFLAGS = -endif - -bin_PROGRAMS = odr-dabmod - -if USE_KISS_FFT -FFT_DIR=$(top_builddir)/lib/kiss_fft129 -FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools -FFT_SRC=$(FFT_DIR)/kiss_fft.c \ - $(FFT_DIR)/kiss_fft.h \ - $(FFT_DIR)/tools/kiss_fftr.c \ - $(FFT_DIR)/tools/kiss_fftr.h \ - kiss_fftsimd.c \ - kiss_fftsimd.h -FFT_FLG=-ffast-math - -.PHONY: kiss_fft129 reed-solomon-4.0 - -DabModulator.cpp: $(FFT_DIR) - -BUILT_SOURCES: $(FFT_DIR) - -FFT_LDADD= - -$(FFT_DIR): - if [ ! -e $(FFT_DIR) ]; then \ - tar xzf $(top_srcdir)/lib/kiss_fft129.tar.gz -C $(top_builddir)/lib; \ - fi - -else -FFT_LDADD= -FFT_DIR= -FFT_INC= -FFT_SRC= -FFT_FLG= -endif - -odr_dabmod_CPPFLAGS = -Wall \ - $(FFT_INC) $(FFT_FLG) $(SIMD_CFLAGS) $(GITVERSION_FLAGS) -odr_dabmod_LDADD = $(FFT_LDADD) -odr_dabmod_SOURCES = DabMod.cpp \ - PcDebug.h \ - porting.c porting.h \ - DabModulator.cpp DabModulator.h \ - Buffer.cpp Buffer.h \ - ModCodec.cpp ModCodec.h \ - ModPlugin.cpp ModPlugin.h \ - ModFormat.cpp ModFormat.h \ - EtiReader.cpp EtiReader.h \ - Eti.cpp Eti.h \ - FicSource.cpp FicSource.h \ - FIRFilter.cpp FIRFilter.h \ - ModInput.cpp ModInput.h \ - PuncturingRule.cpp PuncturingRule.h \ - PuncturingEncoder.cpp PuncturingEncoder.h \ - SubchannelSource.cpp SubchannelSource.h \ - Flowgraph.cpp Flowgraph.h \ - GainControl.cpp GainControl.h \ - OutputMemory.cpp OutputMemory.h \ - OutputZeroMQ.cpp OutputZeroMQ.h \ - TimestampDecoder.h TimestampDecoder.cpp \ - OutputUHD.cpp OutputUHD.h \ - ModOutput.cpp ModOutput.h \ - InputMemory.cpp InputMemory.h \ - InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \ - OutputFile.cpp OutputFile.h \ - FrameMultiplexer.cpp FrameMultiplexer.h \ - ModMux.cpp ModMux.h \ - PrbsGenerator.cpp PrbsGenerator.h \ - BlockPartitioner.cpp BlockPartitioner.h \ - QpskSymbolMapper.cpp QpskSymbolMapper.h \ - FrequencyInterleaver.cpp FrequencyInterleaver.h \ - PhaseReference.cpp PhaseReference.h \ - DifferentialModulator.cpp DifferentialModulator.h \ - NullSymbol.cpp NullSymbol.h \ - SignalMultiplexer.cpp SignalMultiplexer.h \ - CicEqualizer.cpp CicEqualizer.h \ - OfdmGenerator.cpp OfdmGenerator.h \ - GuardIntervalInserter.cpp GuardIntervalInserter.h \ - Resampler.cpp Resampler.h \ - ConvEncoder.cpp ConvEncoder.h \ - TimeInterleaver.cpp TimeInterleaver.h \ - ThreadsafeQueue.h \ - Log.cpp Log.h \ - RemoteControl.cpp RemoteControl.h \ - FormatConverter.cpp FormatConverter.h \ - zmq.hpp - -nodist_odr_dabmod_SOURCES = $(FFT_SRC) - -dist_bin_SCRIPTS = crc-dwap.py - -if USE_KISS_FFT -EXTRA_DIST = kiss_fftsimd.c kiss_fftsimd.h - -clean-local: - rm -rf $(FFT_DIR) - -endif - diff --git a/src/OutputMemory.h b/src/OutputMemory.h index 2dd49c5..56cbc01 100644 --- a/src/OutputMemory.h +++ b/src/OutputMemory.h @@ -50,7 +50,7 @@ class OutputMemory : public ModOutput { public: - OutputMemory(Buffer* dataOut = NULL); + OutputMemory(Buffer* dataOut); virtual ~OutputMemory(); virtual int process(Buffer* dataIn, Buffer* dataOut); const char* name() { return "OutputMemory"; } diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index bfd24a8..b815a4c 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -31,6 +31,11 @@ #include "PcDebug.h" #include "Log.h" #include "RemoteControl.h" +#include "Utils.h" + +#include <boost/thread/future.hpp> + +#include <uhd/utils/msg.hpp> #include <cmath> #include <iostream> @@ -46,23 +51,61 @@ using namespace std; typedef std::complex<float> complexf; +void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg) +{ + if (type == uhd::msg::warning) { + etiLog.level(warn) << "UHD Warning: " << msg; + } + else if (type == uhd::msg::error) { + etiLog.level(error) << "UHD Error: " << msg; + } +} + +// Check function for GPS fixtype +bool check_gps_fix_ok(uhd::usrp::multi_usrp::sptr usrp) +{ + try { + std::string fixtype( + usrp->get_mboard_sensor("gps_fixtype", 0).to_pp_string()); + + if (fixtype.find("3d fix") == std::string::npos) { + etiLog.level(warn) << "OutputUHD: " << fixtype; + + return false; + } + + return true; + } + catch (uhd::lookup_error &e) { + etiLog.level(warn) << "OutputUHD: no gps_fixtype sensor"; + return false; + } +} + + OutputUHD::OutputUHD( - OutputUHDConfig& config, - Logger& logger) : + const OutputUHDConfig& config) : ModOutput(ModFormat(1), ModFormat(0)), RemoteControllable("uhd"), - myLogger(logger), myConf(config), // Since we don't know the buffer size, we cannot initialise // the buffers at object initialisation. first_run(true), + gps_fix_verified(false), activebuffer(1), myDelayBuf(0) { - myMuting = 0; // is remote-controllable + myMuting = true; // is remote-controllable, and reset by the GPS fix check myStaticDelayUs = 0; // is remote-controllable + // Variables needed for GPS fix check + num_checks_without_gps_fix = 1; + first_gps_fix_check.tv_sec = 0; + last_gps_fix_check.tv_sec = 0; + time_last_frame.tv_sec = 0; + + #if FAKE_UHD MDEBUG("OutputUHD:Using fake UHD output"); #else @@ -91,7 +134,10 @@ OutputUHD::OutputUHD( RC_ADD_PARAMETER(freq, "UHD transmission frequency"); RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter"); RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000"); - RC_ADD_PARAMETER(iqbalance, "Set I/Q balance between 0 and 1.0"); + + // TODO: find out how to use boost::bind to give the logger to the + // uhd_msg_handler + uhd::msg::register_handler(uhd_msg_handler); uhd::set_thread_priority_safe(); @@ -152,59 +198,7 @@ OutputUHD::OutputUHD( MDEBUG("OutputUHD:Mute on missing timestamps: %s ...\n", myConf.muteNoTimestamps ? "enabled" : "disabled"); - if (myConf.enableSync && (myConf.pps_src == "none")) { - myLogger.level(warn) << - "OutputUHD: WARNING:" - " you are using synchronous transmission without PPS input!"; - - struct timespec now; - if (clock_gettime(CLOCK_REALTIME, &now)) { - perror("OutputUHD:Error: could not get time: "); - myLogger.level(error) << "OutputUHD: could not get time"; - } - else { - myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec)); - myLogger.level(info) << "OutputUHD: Setting USRP time to " << - uhd::time_spec_t(now.tv_sec).get_real_secs(); - } - } - - if (myConf.pps_src != "none") { - /* handling time for synchronisation: wait until the next full - * second, and set the USRP time at next PPS */ - struct timespec now; - time_t seconds; - if (clock_gettime(CLOCK_REALTIME, &now)) { - myLogger.level(error) << "OutputUHD: could not get time :" << - strerror(errno); - throw std::runtime_error("OutputUHD: could not get time."); - } - else { - seconds = now.tv_sec; - - MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec); - while (seconds + 1 > now.tv_sec) { - usleep(1); - if (clock_gettime(CLOCK_REALTIME, &now)) { - myLogger.level(error) << "OutputUHD: could not get time :" << - strerror(errno); - throw std::runtime_error("OutputUHD: could not get time."); - } - } - MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec); - /* We are now shortly after the second change. */ - - usleep(200000); // 200ms, we want the PPS to be later - myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2)); - myLogger.level(info) << "OutputUHD: Setting USRP time next pps to " << - uhd::time_spec_t(seconds + 2).get_real_secs(); - } - - usleep(1e6); - myLogger.log(info, "OutputUHD: USRP time %f\n", - myUsrp->get_time_now().get_real_secs()); - } - + set_usrp_time(); // preparing output thread worker data uwd.myUsrp = myUsrp; @@ -215,24 +209,27 @@ OutputUHD::OutputUHD( uwd.sampleRate = myConf.sampleRate; uwd.sourceContainsTimestamp = false; uwd.muteNoTimestamps = myConf.muteNoTimestamps; - uwd.logger = &myLogger; uwd.refclk_lock_loss_behaviour = myConf.refclk_lock_loss_behaviour; if (myConf.refclk_src == "internal") { uwd.check_refclk_loss = false; + uwd.check_gpsfix = false; + } + else if (myConf.refclk_src == "gpsdo") { + uwd.check_refclk_loss = true; + uwd.check_gpsfix = (myConf.maxGPSHoldoverTime != 0); } else { uwd.check_refclk_loss = true; + uwd.check_gpsfix = false; } - SetDelayBuffer(config.dabMode); + SetDelayBuffer(myConf.dabMode); shared_ptr<barrier> b(new barrier(2)); mySyncBarrier = b; uwd.sync_barrier = b; - worker.start(&uwd); - MDEBUG("OutputUHD:UHD ready.\n"); } @@ -247,32 +244,30 @@ OutputUHD::~OutputUHD() } } -void OutputUHD::SetDelayBuffer(unsigned int dabMode) +int transmission_frame_duration_ms(unsigned int dabMode) { - // find out the duration of the transmission frame (Table 2 in ETSI 300 401) switch (dabMode) { - case 0: // could happen when called from constructor and we take the mode from ETI - myTFDurationMs = 0; - break; - case 1: - myTFDurationMs = 96; - break; - case 2: - myTFDurationMs = 24; - break; - case 3: - myTFDurationMs = 24; - break; - case 4: - myTFDurationMs = 48; - break; - default: - throw std::runtime_error("OutPutUHD: invalid DAB mode"); - } - // The buffer size equals the number of samples per transmission frame so - // we calculate it by multiplying the duration of the transmission frame - // with the samplerate. - myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000); + // could happen when called from constructor and we take the mode from ETI + case 0: return 0; + + case 1: return 96; + case 2: return 24; + case 3: return 24; + case 4: return 48; + default: + throw std::runtime_error("OutputUHD: invalid DAB mode"); + } +} + +void OutputUHD::SetDelayBuffer(unsigned int dabMode) +{ + // find out the duration of the transmission frame (Table 2 in ETSI 300 401) + myTFDurationMs = transmission_frame_duration_ms(dabMode); + + // The buffer size equals the number of samples per transmission frame so + // we calculate it by multiplying the duration of the transmission frame + // with the samplerate. + myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000); } int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) @@ -286,8 +281,25 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) // the first buffer // We will only wait on the barrier on the subsequent calls to // OutputUHD::process - if (first_run) { - myLogger.level(debug) << "OutputUHD: UHD initialising..."; + if (not gps_fix_verified) { + if (uwd.check_gpsfix) { + initial_gps_check(); + + if (num_checks_without_gps_fix == 0) { + set_usrp_time(); + gps_fix_verified = true; + myMuting = false; + } + } + else { + gps_fix_verified = true; + myMuting = false; + } + } + else if (first_run) { + etiLog.level(debug) << "OutputUHD: UHD initialising..."; + + worker.start(&uwd); uwd.bufsize = dataIn->getLength(); uwd.frame0.buf = malloc(uwd.bufsize); @@ -310,28 +322,53 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) default: break; } - // we only set the delay buffer from the dab mode signaled in ETI if the - // dab mode was not set in contructor - if (myTFDurationMs == 0) - SetDelayBuffer(myEtiReader->getMode()); + // we only set the delay buffer from the dab mode signaled in ETI if the + // dab mode was not set in contructor + if (myTFDurationMs == 0) { + SetDelayBuffer(myEtiReader->getMode()); + } activebuffer = 1; lastLen = uwd.bufsize; first_run = false; - myLogger.level(debug) << "OutputUHD: UHD initialising complete"; + etiLog.level(debug) << "OutputUHD: UHD initialising complete"; } else { if (lastLen != dataIn->getLength()) { // I expect that this never happens. - myLogger.level(emerg) << + etiLog.level(emerg) << "OutputUHD: Fatal error, input length changed from " << lastLen << " to " << dataIn->getLength(); throw std::runtime_error("Non-constant input length!"); } + + if (uwd.check_gpsfix) { + try { + check_gps(); + } + catch (std::runtime_error& e) { + uwd.running = false; + etiLog.level(error) << e.what(); + } + } + mySyncBarrier.get()->wait(); + if (!uwd.running) { + worker.stop(); + first_run = true; + if (uwd.failed_due_to_fct) { + throw fct_discontinuity_error(); + } + else { + etiLog.level(error) << + "OutputUHD: Error, UHD worker failed"; + throw std::runtime_error("UHD worker failed"); + } + } + // write into the our buffer while // the worker sends the other. @@ -371,56 +408,226 @@ int OutputUHD::process(Buffer* dataIn, Buffer* dataOut) } return uwd.bufsize; +} + + +void OutputUHD::set_usrp_time() +{ + if (myConf.enableSync && (myConf.pps_src == "none")) { + etiLog.level(warn) << + "OutputUHD: WARNING:" + " you are using synchronous transmission without PPS input!"; + + struct timespec now; + if (clock_gettime(CLOCK_REALTIME, &now)) { + perror("OutputUHD:Error: could not get time: "); + etiLog.level(error) << "OutputUHD: could not get time"; + } + else { + myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec)); + etiLog.level(info) << "OutputUHD: Setting USRP time to " << + uhd::time_spec_t(now.tv_sec).get_real_secs(); + } + } + + if (myConf.pps_src != "none") { + /* handling time for synchronisation: wait until the next full + * second, and set the USRP time at next PPS */ + struct timespec now; + time_t seconds; + if (clock_gettime(CLOCK_REALTIME, &now)) { + etiLog.level(error) << "OutputUHD: could not get time :" << + strerror(errno); + throw std::runtime_error("OutputUHD: could not get time."); + } + else { + seconds = now.tv_sec; + + MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec); + while (seconds + 1 > now.tv_sec) { + usleep(1); + if (clock_gettime(CLOCK_REALTIME, &now)) { + etiLog.level(error) << "OutputUHD: could not get time :" << + strerror(errno); + throw std::runtime_error("OutputUHD: could not get time."); + } + } + MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec); + /* We are now shortly after the second change. */ + + usleep(200000); // 200ms, we want the PPS to be later + myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2)); + etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " << + uhd::time_spec_t(seconds + 2).get_real_secs(); + } + usleep(1e6); + etiLog.log(info, "OutputUHD: USRP time %f\n", + myUsrp->get_time_now().get_real_secs()); + } } -void UHDWorker::process() +void OutputUHD::initial_gps_check() { - int workerbuffer = 0; - time_t tx_second = 0; - double pps_offset = 0; - double last_pps = 2.0; - double usrp_time; + if (first_gps_fix_check.tv_sec == 0) { + etiLog.level(info) << "Waiting for GPS fix"; - //const struct timespec hundred_nano = {0, 100}; + if (clock_gettime(CLOCK_MONOTONIC, &first_gps_fix_check) != 0) { + stringstream ss; + ss << "clock_gettime failure: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + } - size_t sizeIn; - struct UHDWorkerFrameData* frame; + check_gps(); - size_t num_acc_samps; //number of accumulated samples - //int write_fail_count; + if (last_gps_fix_check.tv_sec > + first_gps_fix_check.tv_sec + initial_gps_fix_wait) { + stringstream ss; + ss << "GPS did not fix in " << initial_gps_fix_wait << " seconds"; + throw std::runtime_error(ss.str()); + } - // Transmit timeout - const double timeout = 0.2; + if (time_last_frame.tv_sec == 0) { + if (clock_gettime(CLOCK_MONOTONIC, &time_last_frame) != 0) { + stringstream ss; + ss << "clock_gettime failure: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + } + + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) != 0) { + stringstream ss; + ss << "clock_gettime failure: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + + long delta_us = timespecdiff_us(time_last_frame, now); + long wait_time_us = transmission_frame_duration_ms(myConf.dabMode); + + if (wait_time_us - delta_us > 0) { + usleep(wait_time_us - delta_us); + } + + time_last_frame.tv_nsec += wait_time_us * 1000; + if (time_last_frame.tv_nsec >= 1000000000L) { + time_last_frame.tv_nsec -= 1000000000L; + time_last_frame.tv_sec++; + } +} + +void OutputUHD::check_gps() +{ + struct timespec time_now; + if (clock_gettime(CLOCK_MONOTONIC, &time_now) != 0) { + stringstream ss; + ss << "clock_gettime failure: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + + // Divide interval by two because we alternate between + // launch and check + if (uwd.check_gpsfix and + last_gps_fix_check.tv_sec + gps_fix_check_interval/2.0 < + time_now.tv_sec) { + last_gps_fix_check = time_now; + + // Alternate between launching thread and checking the + // result. + if (gps_fix_task.joinable()) { + if (gps_fix_future.has_value()) { + + gps_fix_future.wait(); + + gps_fix_task.join(); + + if (not gps_fix_future.get()) { + if (num_checks_without_gps_fix == 0) { + etiLog.level(alert) << + "OutputUHD: GPS Fix lost"; + } + num_checks_without_gps_fix++; + } + else { + if (num_checks_without_gps_fix) { + etiLog.level(info) << + "OutputUHD: GPS Fix recovered"; + } + num_checks_without_gps_fix = 0; + } + + if (gps_fix_check_interval * num_checks_without_gps_fix > + myConf.maxGPSHoldoverTime) { + std::stringstream ss; + ss << "Lost GPS fix for " << gps_fix_check_interval * + num_checks_without_gps_fix << " seconds"; + throw std::runtime_error(ss.str()); + } + } + } + else { + // Checking the sensor here takes too much + // time, it has to be done in a separate thread. + gps_fix_pt = boost::packaged_task<bool>( + boost::bind(check_gps_fix_ok, myUsrp) ); + + gps_fix_future = gps_fix_pt.get_future(); + + gps_fix_task = boost::thread(boost::move(gps_fix_pt)); + } + } +} + +//============================ UHD Worker ======================== + +void UHDWorker::process_errhandler() +{ + try { + process(); + } + catch (fct_discontinuity_error& e) { + etiLog.level(warn) << e.what(); + uwd->failed_due_to_fct = true; + } + + uwd->running = false; + uwd->sync_barrier.get()->wait(); + etiLog.level(warn) << "UHD worker terminated"; +} + +void UHDWorker::process() +{ + int workerbuffer = 0; + tx_second = 0; + pps_offset = 0.0; + last_pps = 2.0; #if FAKE_UHD == 0 uhd::stream_args_t stream_args("fc32"); //complex floats - uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args); - size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); -#else - size_t usrp_max_num_samps = 2048; // arbitrarily chosen + myTxStream = uwd->myUsrp->get_tx_stream(stream_args); #endif - const complexf* in; - - uhd::tx_metadata_t md; md.start_of_burst = false; - md.end_of_burst = false; + md.end_of_burst = false; + + expected_next_fct = -1; - int expected_next_fct = -1; + num_underflows = 0; + num_late_packets = 0; - while (running) { - bool fct_discontinuity = false; - md.has_time_spec = false; - md.time_spec = uhd::time_spec_t(0.0); - num_acc_samps = 0; - //write_fail_count = 0; + while (uwd->running) { + fct_discontinuity = false; + md.has_time_spec = false; + md.time_spec = uhd::time_spec_t(0.0); /* Wait for barrier */ // this wait will hopefully always be the second one // because modulation should be quicker than transmission uwd->sync_barrier.get()->wait(); + struct UHDWorkerFrameData* frame; + if (workerbuffer == 0) { frame = &(uwd->frame0); } @@ -432,241 +639,240 @@ void UHDWorker::process() "UHDWorker.process: workerbuffer is neither 0 nor 1 !"); } - in = reinterpret_cast<const complexf*>(frame->buf); - pps_offset = frame->ts.timestamp_pps_offset; + handle_frame(frame); - // Tx second from MNSC - tx_second = frame->ts.timestamp_sec; + // swap buffers + workerbuffer = (workerbuffer + 1) % 2; + } +} - sizeIn = uwd->bufsize / sizeof(complexf); +void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) +{ + // Transmit timeout + static const double tx_timeout = 20.0; - /* Verify that the FCT value is correct. If we miss one transmission - * frame we must interrupt UHD and resync to the timestamps - */ - if (expected_next_fct != -1) { - if (expected_next_fct != (int)frame->ts.fct) { - uwd->logger->level(warn) << - "OutputUHD: Incorrect expect fct " << frame->ts.fct; + pps_offset = frame->ts.timestamp_pps_offset; - fct_discontinuity = true; - } - } + // Tx second from MNSC + tx_second = frame->ts.timestamp_sec; - expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250; + /* Verify that the FCT value is correct. If we miss one transmission + * frame we must interrupt UHD and resync to the timestamps + */ + if (frame->ts.fct == -1) { + etiLog.level(info) << + "OutputUHD: dropping one frame with invalid FCT"; + return; + } + if (expected_next_fct != -1) { + if (expected_next_fct != (int)frame->ts.fct) { + etiLog.level(warn) << + "OutputUHD: Incorrect expect fct " << frame->ts.fct << + ", expected " << expected_next_fct; + + fct_discontinuity = true; + throw fct_discontinuity_error(); + } + } - // Check for ref_lock - if (uwd->check_refclk_loss) - { - try { - // TODO: Is this check specific to the B100 and USRP2 ? - if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) { - uwd->logger->log(alert, - "OutputUHD: External reference clock lock lost !"); - if (uwd->refclk_lock_loss_behaviour == CRASH) { - throw std::runtime_error( - "OutputUHD: External reference clock lock lost."); - } + expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250; + + // Check for ref_lock + if (uwd->check_refclk_loss) { + try { + // TODO: Is this check specific to the B100 and USRP2 ? + if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) { + etiLog.log(alert, + "OutputUHD: External reference clock lock lost !"); + if (uwd->refclk_lock_loss_behaviour == CRASH) { + throw std::runtime_error( + "OutputUHD: External reference clock lock lost."); } } - catch (uhd::lookup_error &e) { - uwd->check_refclk_loss = false; - uwd->logger->log(warn, - "OutputUHD: This USRP does not have mboard sensor for ext clock loss." - " Check disabled."); - } } + catch (uhd::lookup_error &e) { + uwd->check_refclk_loss = false; + etiLog.log(warn, + "OutputUHD: This USRP does not have mboard sensor for ext clock loss." + " Check disabled."); + } + } - usrp_time = uwd->myUsrp->get_time_now().get_real_secs(); - - if (uwd->sourceContainsTimestamp) { - if (!frame->ts.timestamp_valid) { - /* We have not received a full timestamp through - * MNSC. We sleep through the frame. - */ - uwd->logger->level(info) << - "OutputUHD: Throwing sample " << frame->ts.fct << - " away: incomplete timestamp " << tx_second << - " + " << pps_offset; - usleep(20000); //TODO should this be TM-dependant ? - goto loopend; - } - - md.has_time_spec = true; - md.time_spec = uhd::time_spec_t(tx_second, pps_offset); - - // md is defined, let's do some checks - if (md.time_spec.get_real_secs() + 0.2 < usrp_time) { - uwd->logger->level(warn) << - "OutputUHD: Timestamp in the past! offset: " << - md.time_spec.get_real_secs() - usrp_time << - " (" << usrp_time << ")" - " frame " << frame->ts.fct << - ", tx_second " << tx_second << - ", pps " << pps_offset; - goto loopend; //skip the frame - } + double usrp_time = uwd->myUsrp->get_time_now().get_real_secs(); - if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) { - uwd->logger->level(warn) << - "OutputUHD: Timestamp too far in the future! offset: " << - md.time_spec.get_real_secs() - usrp_time; - usleep(20000); //sleep so as to fill buffers - } - if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { - uwd->logger->level(error) << - "OutputUHD: Timestamp way too far in the future! offset: " << - md.time_spec.get_real_secs() - usrp_time; - throw std::runtime_error("Timestamp error. Aborted."); - } + if (uwd->sourceContainsTimestamp) { + if (!frame->ts.timestamp_valid) { + /* We have not received a full timestamp through + * MNSC. We sleep through the frame. + */ + etiLog.level(info) << + "OutputUHD: Throwing sample " << frame->ts.fct << + " away: incomplete timestamp " << tx_second << + " + " << pps_offset; + usleep(20000); //TODO should this be TM-dependant ? + return; + } - if (last_pps > pps_offset) { - uwd->logger->log(info, - "OutputUHD (usrp time: %f): frame %d;" - " tx_second %zu; pps %.9f\n", - usrp_time, - frame->ts.fct, tx_second, pps_offset); - } + md.has_time_spec = true; + md.time_spec = uhd::time_spec_t(tx_second, pps_offset); + + // md is defined, let's do some checks + if (md.time_spec.get_real_secs() + tx_timeout < usrp_time) { + etiLog.level(warn) << + "OutputUHD: Timestamp in the past! offset: " << + md.time_spec.get_real_secs() - usrp_time << + " (" << usrp_time << ")" + " frame " << frame->ts.fct << + ", tx_second " << tx_second << + ", pps " << pps_offset; + return; + } + if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { + etiLog.level(error) << + "OutputUHD: Timestamp way too far in the future! offset: " << + md.time_spec.get_real_secs() - usrp_time; + throw std::runtime_error("Timestamp error. Aborted."); } - else { // !uwd->sourceContainsTimestamp - if (uwd->muting || uwd->muteNoTimestamps) { - /* There was some error decoding the timestamp - */ - if (uwd->muting) { - uwd->logger->log(info, - "OutputUHD: Muting sample %d requested\n", - frame->ts.fct); - } - else { - uwd->logger->log(info, - "OutputUHD: Muting sample %d : no timestamp\n", - frame->ts.fct); - } - usleep(20000); - goto loopend; + } + else { // !uwd->sourceContainsTimestamp + if (uwd->muting || uwd->muteNoTimestamps) { + /* There was some error decoding the timestamp + */ + if (uwd->muting) { + etiLog.log(info, + "OutputUHD: Muting sample %d requested\n", + frame->ts.fct); } + else { + etiLog.log(info, + "OutputUHD: Muting sample %d : no timestamp\n", + frame->ts.fct); + } + usleep(20000); + return; } + } - PDEBUG("UHDWorker::process:max_num_samps: %zu.\n", - usrp_max_num_samps); + tx_frame(frame); - while (running && !uwd->muting && (num_acc_samps < sizeIn)) { - size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); + if (last_pps > pps_offset) { + if (num_underflows or num_late_packets) { + etiLog.log(info, + "OutputUHD status (usrp time: %f): " + "%d underruns and %d late packets since last status.\n", + usrp_time, + num_underflows, num_late_packets); + } + num_underflows = 0; + num_late_packets = 0; + } - //ensure the the last packet has EOB set if the timestamps has been - //refreshed and need to be reconsidered. - //Also, if we saw that the FCT did not increment as expected, which - //could be due to a lost incoming packet. - md.end_of_burst = ( - uwd->sourceContainsTimestamp && - (frame->ts.timestamp_refresh || fct_discontinuity) && - samps_to_send <= usrp_max_num_samps ); + last_pps = pps_offset; +} +void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame) +{ + const double tx_timeout = 20.0; + const size_t sizeIn = uwd->bufsize / sizeof(complexf); + const complexf* in_data = reinterpret_cast<const complexf*>(frame->buf); -#if FAKE_UHD - // This is probably very approximate - usleep( (1000000 / uwd->sampleRate) * samps_to_send); - size_t num_tx_samps = samps_to_send; +#if FAKE_UHD == 0 + size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); #else - //send a single packet - size_t num_tx_samps = myTxStream->send( - &in[num_acc_samps], - samps_to_send, md, timeout); + size_t usrp_max_num_samps = 2048; // arbitrarily chosen #endif - num_acc_samps += num_tx_samps; + size_t num_acc_samps = 0; //number of accumulated samples + while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) { + size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); - md.time_spec = uhd::time_spec_t(tx_second, pps_offset) - + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate); + //ensure the the last packet has EOB set if the timestamps has been + //refreshed and need to be reconsidered. + //Also, if we saw that the FCT did not increment as expected, which + //could be due to a lost incoming packet. + md.end_of_burst = ( + uwd->sourceContainsTimestamp && + (frame->ts.timestamp_refresh || fct_discontinuity) && + samps_to_send <= usrp_max_num_samps ); - /* - fprintf(stderr, "*** pps_offset %f, md.time_spec %f, usrp->now %f\n", - pps_offset, - md.time_spec.get_real_secs(), - uwd->myUsrp->get_time_now().get_real_secs()); - // */ - - if (num_tx_samps == 0) { -#if 1 - uwd->logger->log(warn, - "UHDWorker::process() unable to write to device, skipping frame!\n"); - break; +#if FAKE_UHD + // This is probably very approximate + usleep( (1000000 / uwd->sampleRate) * samps_to_send); + size_t num_tx_samps = samps_to_send; #else - // This has been disabled, because if there is a write failure, - // we'd better not insist and try to go on transmitting future - // frames. - // The goal is not to try to send by all means possible. It's - // more important to make sure the SFN is not disturbed. - - fprintf(stderr, "F"); - nanosleep(&hundred_nano, NULL); - write_fail_count++; - if (write_fail_count >= 3) { - double ts = md.time_spec.get_real_secs(); - double t_usrp = uwd->myUsrp->get_time_now().get_real_secs(); - - fprintf(stderr, "*** USRP write fail count %d\n", write_fail_count); - fprintf(stderr, "*** delta %f, md.time_spec %f, usrp->now %f\n", - ts - t_usrp, - ts, t_usrp); - - fprintf(stderr, "UHDWorker::process() unable to write to device, skipping frame!\n"); - break; - } + //send a single packet + size_t num_tx_samps = myTxStream->send( + &in_data[num_acc_samps], + samps_to_send, md, tx_timeout); #endif - } -#if FAKE_UHD == 0 - uhd::async_metadata_t async_md; - if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) { - const char* uhd_async_message = ""; - bool failure = true; - switch (async_md.event_code) { - case uhd::async_metadata_t::EVENT_CODE_BURST_ACK: - failure = false; - break; - case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW: - uhd_async_message = "Underflow"; - break; - case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR: - uhd_async_message = "Packet loss between host and device."; - break; - case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR: - uhd_async_message = "Packet had time that was late."; - break; - case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET: - uhd_async_message = "Underflow occurred inside a packet."; - break; - case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST: - uhd_async_message = "Packet loss within a burst."; - break; - default: - uhd_async_message = "unknown event code"; - break; - } + num_acc_samps += num_tx_samps; - if (failure) { - uwd->logger->level(alert) << "Near frame " << - frame->ts.fct << ": Received Async UHD Message '" << - uhd_async_message << "'"; + md.time_spec = uhd::time_spec_t(tx_second, pps_offset) + + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate); - } - } -#endif + if (num_tx_samps == 0) { + etiLog.log(warn, + "UHDWorker::process() unable to write to device, skipping frame!\n"); + break; } - last_pps = pps_offset; - -loopend: - // swap buffers - workerbuffer = (workerbuffer + 1) % 2; + print_async_metadata(frame); } +} - uwd->logger->level(warn) << "UHD worker terminated"; +void UHDWorker::print_async_metadata(const struct UHDWorkerFrameData *frame) +{ +#if FAKE_UHD == 0 + uhd::async_metadata_t async_md; + if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) { + const char* uhd_async_message = ""; + bool failure = false; + switch (async_md.event_code) { + case uhd::async_metadata_t::EVENT_CODE_BURST_ACK: + break; + case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW: + uhd_async_message = "Underflow"; + num_underflows++; + break; + case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR: + uhd_async_message = "Packet loss between host and device."; + failure = true; + break; + case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR: + uhd_async_message = "Packet had time that was late."; + num_late_packets++; + break; + case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET: + uhd_async_message = "Underflow occurred inside a packet."; + failure = true; + break; + case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST: + uhd_async_message = "Packet loss within a burst."; + failure = true; + break; + default: + uhd_async_message = "unknown event code"; + failure = true; + break; + } + + if (failure) { + etiLog.level(alert) << "Near frame " << + frame->ts.fct << ": Received Async UHD Message '" << + uhd_async_message << "'"; + + } + } +#endif } +// ======================================= +// Remote Control for UHD +// ======================================= void OutputUHD::set_parameter(const string& parameter, const string& value) { @@ -688,26 +894,21 @@ void OutputUHD::set_parameter(const string& parameter, const string& value) else if (parameter == "staticdelay") { int64_t adjust; ss >> adjust; - if (adjust > (myTFDurationMs * 1000)) - { // reset static delay for values outside range - myStaticDelayUs = 0; - } - else - { // the new adjust value is added to the existing delay and the result - // is wrapped around at TF duration - int newStaticDelayUs = myStaticDelayUs + adjust; - if (newStaticDelayUs > (myTFDurationMs * 1000)) - myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000); - else if (newStaticDelayUs < 0) - myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000); - else - myStaticDelayUs = newStaticDelayUs; - } - } - else if (parameter == "iqbalance") { - ss >> myConf.frequency; - myUsrp->set_tx_freq(myConf.frequency); - myConf.frequency = myUsrp->get_tx_freq(); + if (adjust > (myTFDurationMs * 1000)) + { // reset static delay for values outside range + myStaticDelayUs = 0; + } + else + { // the new adjust value is added to the existing delay and the result + // is wrapped around at TF duration + int newStaticDelayUs = myStaticDelayUs + adjust; + if (newStaticDelayUs > (myTFDurationMs * 1000)) + myStaticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000); + else if (newStaticDelayUs < 0) + myStaticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000); + else + myStaticDelayUs = newStaticDelayUs; + } } else { stringstream ss; @@ -741,3 +942,4 @@ const string OutputUHD::get_parameter(const string& parameter) const } #endif // HAVE_OUTPUT_UHD + diff --git a/src/OutputUHD.h b/src/OutputUHD.h index d002e98..633de04 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -50,6 +50,7 @@ DESCRIPTION: #include <boost/thread/thread.hpp> #include <boost/thread/barrier.hpp> #include <boost/shared_ptr.hpp> +#include <boost/atomic.hpp> #include <list> #include <string> @@ -83,9 +84,20 @@ struct UHDWorkerFrameData { struct frame_timestamp ts; }; +struct fct_discontinuity_error : public std::exception +{ + const char* what () const throw () + { + return "FCT discontinuity detected"; + } +}; + enum refclk_lock_loss_behaviour_t { CRASH, IGNORE }; struct UHDWorkerData { + boost::atomic<bool> running; + bool failed_due_to_fct; + #if FAKE_UHD == 0 uhd::usrp::multi_usrp::sptr myUsrp; #endif @@ -109,6 +121,9 @@ struct UHDWorkerData { // If we want to verify loss of refclk bool check_refclk_loss; + // If we want to check for the gps_fixtype sensor + bool check_gpsfix; + // muting set by remote control bool muting; @@ -118,9 +133,6 @@ struct UHDWorkerData { // What to do when the reference clock PLL loses lock refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour; - // The common logger - Logger* logger; - // What transmission mode we're using defines by how // much the FCT should increment for each // transmission frame. @@ -130,31 +142,44 @@ struct UHDWorkerData { class UHDWorker { public: - UHDWorker () { - running = false; - } - void start(struct UHDWorkerData *uhdworkerdata) { - running = true; uwd = uhdworkerdata; - uhd_thread = boost::thread(&UHDWorker::process, this); + + uwd->running = true; + uwd->failed_due_to_fct = false; + uhd_thread = boost::thread(&UHDWorker::process_errhandler, this); } void stop() { - running = false; + uwd->running = false; uhd_thread.interrupt(); uhd_thread.join(); } - void process(); + private: + // Asynchronous message statistics + int num_underflows; + int num_late_packets; + bool fct_discontinuity; + int expected_next_fct; + uhd::tx_metadata_t md; + time_t tx_second; + double pps_offset; + double last_pps; + + void print_async_metadata(const struct UHDWorkerFrameData *frame); + + void handle_frame(const struct UHDWorkerFrameData *frame); + void tx_frame(const struct UHDWorkerFrameData *frame); - private: struct UHDWorkerData *uwd; - bool running; boost::thread uhd_thread; uhd::tx_streamer::sptr myTxStream; + + void process(); + void process_errhandler(); }; /* This structure is used as initial configuration for OutputUHD */ @@ -171,7 +196,8 @@ struct OutputUHDConfig { double txgain; bool enableSync; bool muteNoTimestamps; - unsigned dabMode; + unsigned dabMode; + unsigned maxGPSHoldoverTime; /* allowed values : auto, int, sma, mimo */ std::string refclk_src; @@ -190,9 +216,7 @@ struct OutputUHDConfig { class OutputUHD: public ModOutput, public RemoteControllable { public: - OutputUHD( - OutputUHDConfig& config, - Logger& logger); + OutputUHD(const OutputUHDConfig& config); ~OutputUHD(); int process(Buffer* dataIn, Buffer* dataOut); @@ -218,13 +242,16 @@ class OutputUHD: public ModOutput, public RemoteControllable { protected: - Logger& myLogger; + OutputUHD(const OutputUHD& other); + OutputUHD& operator=(const OutputUHD& other); + EtiReader *myEtiReader; OutputUHDConfig myConf; uhd::usrp::multi_usrp::sptr myUsrp; boost::shared_ptr<boost::barrier> mySyncBarrier; UHDWorker worker; bool first_run; + bool gps_fix_verified; struct UHDWorkerData uwd; int activebuffer; @@ -232,14 +259,36 @@ class OutputUHD: public ModOutput, public RemoteControllable { bool myMuting; private: - // methods - void SetDelayBuffer(unsigned int dabMode); + // Resize the internal delay buffer according to the dabMode and + // the sample rate. + void SetDelayBuffer(unsigned int dabMode); // data int myStaticDelayUs; // static delay in microseconds - int myTFDurationMs; // TF duration in milliseconds + int myTFDurationMs; // TF duration in milliseconds std::vector<complexf> myDelayBuf; size_t lastLen; + + // GPS Fix check variables + int num_checks_without_gps_fix; + struct timespec first_gps_fix_check; + struct timespec last_gps_fix_check; + struct timespec time_last_frame; + boost::packaged_task<bool> gps_fix_pt; + boost::unique_future<bool> gps_fix_future; + boost::thread gps_fix_task; + + // Wait time in seconds to get fix + static const int initial_gps_fix_wait = 60; + + // Interval for checking the GPS at runtime + static const double gps_fix_check_interval = 10.0; // seconds + + void check_gps(); + + void set_usrp_time(); + + void initial_gps_check(); }; #endif // HAVE_OUTPUT_UHD diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 793e473..da4473e 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -32,19 +32,31 @@ #if defined(HAVE_ZEROMQ) -OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut) +OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) : ModOutput(ModFormat(1), ModFormat(0)), + m_type(type), m_zmq_context(1), - m_zmq_pub_sock(m_zmq_context, ZMQ_PUB), + m_zmq_sock(m_zmq_context, type), m_endpoint(endpoint) { PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this); std::stringstream ss; - ss << "OutputZeroMQ(" << m_endpoint << ")"; + ss << "OutputZeroMQ(" << m_endpoint << " "; + + if (type == ZMQ_PUB) { + ss << "ZMQ_PUB"; + } + else if (type == ZMQ_REP) { + ss << "ZMQ_REP"; + } + else { + throw std::invalid_argument("ZMQ socket type unknown"); + } + ss << ")"; m_name = ss.str(); - m_zmq_pub_sock.bind(m_endpoint.c_str()); + m_zmq_sock.bind(m_endpoint.c_str()); } OutputZeroMQ::~OutputZeroMQ() @@ -58,7 +70,13 @@ int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut) "(dataIn: %p, dataOut: %p)\n", dataIn, dataOut); - m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength()); + if (m_type == ZMQ_REP) { + // A ZMQ_REP socket requires a request first + zmq::message_t msg; + m_zmq_sock.recv(&msg); + } + + m_zmq_sock.send(dataIn->getData(), dataIn->getLength()); return dataIn->getLength(); } diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h index a80eab4..85f85a7 100644 --- a/src/OutputZeroMQ.h +++ b/src/OutputZeroMQ.h @@ -39,14 +39,15 @@ class OutputZeroMQ : public ModOutput { public: - OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL); + OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL); virtual ~OutputZeroMQ(); virtual int process(Buffer* dataIn, Buffer* dataOut); const char* name() { return m_name.c_str(); } protected: + int m_type; // zmq socket type zmq::context_t m_zmq_context; // handle for the zmq context - zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket + zmq::socket_t m_zmq_sock; // handle for the zmq publisher socket std::string m_endpoint; // On which port to listen: e.g. // tcp://*:58300 diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 7a2af00..21a6c81 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -58,8 +58,10 @@ void RemoteControllerTelnet::restart_thread(long) void RemoteControllerTelnet::process(long) { - m_welcome = "ODR-DabMod Remote Control CLI\nWrite 'help' for help.\n**********\n"; - m_prompt = "> "; + std::string m_welcome = "ODR-DabMod Remote Control CLI\n" + "Write 'help' for help.\n" + "**********\n"; + std::string m_prompt = "> "; std::string in_message; size_t length; diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 89a1583..1b5e447 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -34,7 +34,7 @@ #endif #if defined(HAVE_ZEROMQ) -#include <zmq.hpp> +#include "zmq.hpp" #endif #include <list> @@ -50,6 +50,7 @@ #include <boost/thread.hpp> #include <stdexcept> +#include "Log.h" #define RC_ADD_PARAMETER(p, desc) { \ std::vector<std::string> p; \ @@ -114,8 +115,8 @@ class RemoteControllers { it != m_controllers.end(); ++it) { if ((*it)->fault_detected()) { - fprintf(stderr, - "Detected Remote Control fault, restarting it\n"); + etiLog.level(warn) << + "Detected Remote Control fault, restarting it"; (*it)->restart(); } } @@ -289,9 +290,6 @@ class RemoteControllerTelnet : public BaseRemoteController { /* This controller commands the controllables in the cohort */ std::list<RemoteControllable*> m_cohort; - std::string m_welcome; - std::string m_prompt; - int m_port; }; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h index 78e9ef0..e5e83ef 100644 --- a/src/ThreadsafeQueue.h +++ b/src/ThreadsafeQueue.h @@ -38,25 +38,14 @@ * that pushes elements into the queue, and one consumer that * retrieves the elements. * - * The queue can make the consumer block until enough elements - * are available. + * The queue can make the consumer block until an element + * is available. */ template<typename T> class ThreadsafeQueue { public: - /* Create a new queue without any minimum required - * fill before it is possible to pop an element - */ - ThreadsafeQueue() : the_required_size(1) {} - - /* Create a queue where it has to contain at least - * required_size elements before pop is possible - */ - ThreadsafeQueue(size_t required_size) : the_required_size(required_size) { - } - /* Push one element into the queue, and notify another thread that * might be waiting. * @@ -87,14 +76,14 @@ public: size_t size() const { + boost::mutex::scoped_lock lock(the_mutex); return the_queue.size(); } bool try_pop(T& popped_value) { boost::mutex::scoped_lock lock(the_mutex); - if(the_queue.size() < the_required_size) - { + if (the_queue.empty()) { return false; } @@ -103,10 +92,10 @@ public: return true; } - void wait_and_pop(T& popped_value) + void wait_and_pop(T& popped_value, size_t prebuffering = 1) { boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.size() < the_required_size) { + while (the_queue.size() < prebuffering) { the_condition_variable.wait(lock); } @@ -118,7 +107,6 @@ private: std::queue<T> the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable; - size_t the_required_size; }; #endif diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 96c84c0..5044366 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -29,6 +29,7 @@ #include <fstream> #include <string> #include <boost/lexical_cast.hpp> +#include <boost/make_shared.hpp> #include <sys/types.h> #include "PcDebug.h" #include "TimestampDecoder.h" @@ -41,7 +42,8 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts) { - struct frame_timestamp* ts_queued = new struct frame_timestamp; + boost::shared_ptr<struct frame_timestamp> ts_queued = + boost::make_shared<struct frame_timestamp>(); /* Push new timestamp into queue */ ts_queued->timestamp_valid = full_timestamp_received_mnsc; @@ -62,14 +64,14 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts) * * Therefore, use <= and not < for comparison */ - if (queue_timestamps.size() <= modconfig.delay_calculation_pipeline_stages) { - //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), modconfig.delay_calculation_pipeline_stages); + if (queue_timestamps.size() <= m_tist_delay_stages) { + //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), m_tist_delay_stages); /* Return invalid timestamp until the queue is full */ ts.timestamp_valid = false; ts.timestamp_sec = 0; ts.timestamp_pps_offset = 0; ts.timestamp_refresh = false; - ts.fct = 0; + ts.fct = -1; } else { //fprintf(stderr, ". %zu ", queue_timestamps.size()); @@ -87,16 +89,14 @@ void TimestampDecoder::calculateTimestamp(struct frame_timestamp& ts) ts.timestamp_sec, ts.timestamp_pps_offset, ts.timestamp_refresh);*/ - - delete ts_queued; } MDEBUG("Timestamp queue size %zu, delay_calc %u\n", queue_timestamps.size(), - modconfig.delay_calculation_pipeline_stages); + m_tist_delay_stages); - if (queue_timestamps.size() > modconfig.delay_calculation_pipeline_stages) { - myLogger.level(error) << "Error: Timestamp queue is too large : size " << + if (queue_timestamps.size() > m_tist_delay_stages) { + etiLog.level(error) << "Error: Timestamp queue is too large : size " << queue_timestamps.size() << "! This should not happen !"; } @@ -191,84 +191,48 @@ void TimestampDecoder::updateTimestampEti( int framephase, uint16_t mnsc, double pps, - uint32_t fct) + int32_t fct) { updateTimestampPPS(pps); pushMNSCData(framephase, mnsc); latestFCT = fct; } - -bool TimestampDecoder::updateModulatorOffset() +void TimestampDecoder::set_parameter( + const std::string& parameter, + const std::string& value) { using namespace std; - using boost::lexical_cast; - using boost::bad_lexical_cast; - if (modconfig.use_offset_fixed) - { - timestamp_offset = modconfig.offset_fixed; - return true; - } - else if (modconfig.use_offset_file) - { - bool r = false; - double newoffset; + stringstream ss(value); + ss.exceptions ( stringstream::failbit | stringstream::badbit ); - std::string filedata; - ifstream filestream; - - try - { - filestream.open(modconfig.offset_filename.c_str()); - if (!filestream.eof()) - { - getline(filestream, filedata); - try - { - newoffset = lexical_cast<double>(filedata); - r = true; - } - catch (bad_lexical_cast& e) - { - myLogger.level(error) << - "Error parsing timestamp offset from file '" << - modconfig.offset_filename << "'"; - r = false; - } - } - else - { - myLogger.level(error) << - "Error reading from timestamp offset file: eof reached\n"; - r = false; - } - filestream.close(); - } - catch (exception& e) - { - myLogger.level(error) << "Error opening timestamp offset file\n"; - r = false; - } - - - if (r) - { - if (timestamp_offset != newoffset) - { - timestamp_offset = newoffset; - myLogger.level(info) << - "TimestampDecoder::updateTimestampOffset: new offset is " << - timestamp_offset; - offset_changed = true; - } + if (parameter == "offset") { + ss >> timestamp_offset; + offset_changed = true; + } + else { + stringstream ss; + ss << "Parameter '" << parameter + << "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } +} - } +const std::string TimestampDecoder::get_parameter( + const std::string& parameter) const +{ + using namespace std; - return r; + stringstream ss; + if (parameter == "offset") { + ss << timestamp_offset; } else { - return false; + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); } + return ss.str(); } diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index 0c393e4..d8ab633 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -28,34 +28,19 @@ #define TIMESTAMP_DECODER_H #include <queue> +#include <boost/shared_ptr.hpp> #include <string> #include <time.h> #include <math.h> #include <stdio.h> #include "Eti.h" #include "Log.h" - -struct modulator_offset_config -{ - bool use_offset_fixed; - double offset_fixed; - /* These two fields are used when the modulator is run with a fixed offset */ - - bool use_offset_file; - std::string offset_filename; - /* These two fields are used when the modulator reads the offset from a file */ - - unsigned delay_calculation_pipeline_stages; - /* Specifies by how many stages the timestamp must be delayed. - * (e.g. The FIRFilter is pipelined, therefore we must increase - * delay_calculation_pipeline_stages by one if the filter is used - */ -}; +#include "RemoteControl.h" struct frame_timestamp { // Which frame count does this timestamp apply to - uint32_t fct; + int32_t fct; uint32_t timestamp_sec; double timestamp_pps_offset; @@ -101,21 +86,32 @@ struct frame_timestamp void print(const char* t) { fprintf(stderr, - "%s <struct frame_timestamp(%s, %d, %.9f)>\n", + "%s <struct frame_timestamp(%s, %d, %.9f, %d)>\n", t, this->timestamp_valid ? "valid" : "invalid", - this->timestamp_sec, this->timestamp_pps_offset); + this->timestamp_sec, this->timestamp_pps_offset, + this->fct); } }; /* This module decodes MNSC time information */ -class TimestampDecoder +class TimestampDecoder : public RemoteControllable { public: TimestampDecoder( - struct modulator_offset_config& config, - Logger& logger): - myLogger(logger), modconfig(config) + /* The modulator adds this offset to the TIST to define time of + * frame transmission + */ + double offset_s, + + /* Specifies by how many stages the timestamp must be delayed. + * (e.g. The FIRFilter is pipelined, therefore we must increase + * tist_delay_stages by one if the filter is used + */ + unsigned tist_delay_stages) : + RemoteControllable("tist") { + timestamp_offset = offset_s; + m_tist_delay_stages = tist_delay_stages; inhibit_second_update = 0; time_pps = 0.0; time_secs = 0; @@ -125,10 +121,10 @@ class TimestampDecoder gmtime_r(0, &temp_time); offset_changed = false; - myLogger.level(info) << "Setting up timestamp decoder with " << - (modconfig.use_offset_fixed ? "fixed" : - (modconfig.use_offset_file ? "dynamic" : "none")) << - " offset"; + RC_ADD_PARAMETER(offset, "TIST offset [s]"); + + etiLog.level(info) << "Setting up timestamp decoder with " << + timestamp_offset << " offset"; }; @@ -140,16 +136,25 @@ class TimestampDecoder int framephase, uint16_t mnsc, double pps, - uint32_t fct); + int32_t fct); - /* Update the modulator timestamp offset according to the modconf + /*********** REMOTE CONTROL ***************/ + /* virtual void enrol_at(BaseRemoteController& controller) + * is inherited */ - bool updateModulatorOffset(); - protected: - /* Main program logger */ - Logger& myLogger; + /* Base function to set parameters. */ + virtual void set_parameter(const std::string& parameter, + const std::string& value); + + /* Getting a parameter always returns a string. */ + virtual const std::string get_parameter( + const std::string& parameter) const; + const char* name() { return "TS"; } + + + protected: /* Push a new MNSC field into the decoder */ void pushMNSCData(int framephase, uint16_t mnsc); @@ -167,15 +172,13 @@ class TimestampDecoder struct tm temp_time; uint32_t time_secs; - uint32_t latestFCT; + int32_t latestFCT; double time_pps; double timestamp_offset; + unsigned m_tist_delay_stages; int inhibit_second_update; bool offset_changed; - /* configuration for the offset management */ - struct modulator_offset_config& modconfig; - /* When the type or identifier don't match, the decoder must * be disabled */ @@ -189,8 +192,9 @@ class TimestampDecoder * synchronise two modulators if only one uses (for instance) the * FIRFilter (1 stage pipeline) */ - std::queue<struct frame_timestamp*> queue_timestamps; + std::queue<boost::shared_ptr<struct frame_timestamp> > queue_timestamps; }; #endif + diff --git a/src/Utils.cpp b/src/Utils.cpp new file mode 100644 index 0000000..6c9b0fc --- /dev/null +++ b/src/Utils.cpp @@ -0,0 +1,118 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Copyright (C) 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + */ +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Utils.h" +#include "GainControl.h" + +void printUsage(char* progName) +{ + FILE* out = stderr; + + fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", + PACKAGE, +#if defined(GITVERSION) + GITVERSION, +#else + VERSION, +#endif + __DATE__, __TIME__); + fprintf(out, "Usage with configuration file:\n"); + fprintf(out, "\t%s [-C] config_file.ini\n\n", progName); + + fprintf(out, "Usage with command line options:\n"); + fprintf(out, "\t%s" + " input" + " (-f filename | -u uhddevice -F frequency) " + " [-G txgain]" + " [-o offset]" + " [-T filter_taps_file]" + " [-a gain]" + " [-c clockrate]" + " [-g gainMode]" + " [-h]" + " [-l]" + " [-m dabMode]" + " [-r samplingRate]" + "\n", progName); + fprintf(out, "Where:\n"); + fprintf(out, "input: ETI input filename (default: stdin).\n"); + fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n"); + fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n"); + fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n"); + fprintf(out, "-G txgain: Set the transmit gain for the UHD driver (default: 0)\n"); + fprintf(out, "-o: (UHD only) Set the timestamp offset added to the timestamp in the ETI. The offset is a double.\n"); + fprintf(out, " Specifying this option has two implications: It enables synchronous transmission,\n" + " requiring an external REFCLK and PPS signal and frames that do not contain a valid timestamp\n" + " get muted.\n\n"); + fprintf(out, "-T taps_file: Enable filtering before the output, using the specified file containing the filter taps.\n"); + fprintf(out, "-a gain: Apply digital amplitude gain.\n"); + fprintf(out, "-c rate: Set the DAC clock rate and enable Cic Equalisation.\n"); + fprintf(out, "-g: Set computation gain mode: " + "%u FIX, %u MAX, %u VAR\n", GAIN_FIX, GAIN_MAX, GAIN_VAR); + fprintf(out, "-h: Print this help.\n"); + fprintf(out, "-l: Loop file when reach end of file.\n"); + fprintf(out, "-m mode: Set DAB mode: (0: auto, 1-4: force).\n"); + fprintf(out, "-r rate: Set output sampling rate (default: 2048000).\n\n"); +} + + +void printVersion(void) +{ + FILE *out = stderr; + + fprintf(out, "Welcome to %s %s, compiled at %s, %s\n\n", + PACKAGE, VERSION, __DATE__, __TIME__); + fprintf(out, + " ODR-DabMod is copyright (C) Her Majesty the Queen in Right of Canada,\n" + " 2009, 2010, 2011, 2012 Communications Research Centre (CRC),\n" + " and\n" + " Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li\n" + "\n" + " http://opendigitalradio.org\n" + "\n" + " ODR-DabMod is free software: you can redistribute it and/or modify it\n" + " under the terms of the GNU General Public License as published by the\n" + " Free Software Foundation, either version 3 of the License, or (at your\n" + " option) any later version.\n" + "\n" + " ODR-DabMod is distributed in the hope that it will be useful, but\n" + " WITHOUT ANY WARRANTY; without even the implied warranty of\n" + " MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU\n" + " General Public License for more details.\n" + "\n" + " You should have received a copy of the GNU General Public License along\n" + " with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.\n" + "\n" +#if USE_KISS_FFT + "ODR-DabMod makes use of the following open source packages:\n" + " Kiss FFT v1.2.9 (Revised BSD) - http://kissfft.sourceforge.net/\n" +#endif + ); + +} + + diff --git a/src/Utils.h b/src/Utils.h new file mode 100644 index 0000000..f023646 --- /dev/null +++ b/src/Utils.h @@ -0,0 +1,62 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Copyright (C) 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + */ +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UTILS_H_ +#define __UTILS_H_ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <stdlib.h> +#include <unistd.h> +#include <stdio.h> +#include <time.h> + +void printUsage(char* progName); + +void printVersion(void); + +inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time) +{ + long tv_sec; + long tv_nsec; + if (time.tv_nsec < oldTime.tv_nsec) { + tv_sec = time.tv_sec - 1 - oldTime.tv_sec; + tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec; + } + else { + tv_sec = time.tv_sec - oldTime.tv_sec; + tv_nsec = time.tv_nsec - oldTime.tv_nsec; + } + + return tv_sec * 1000 + tv_nsec / 1000; +} + + +#endif + |