/* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org */ /* This file is part of ODR-DabMod. ODR-DabMod is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMod is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMod. If not, see . */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "porting.h" #include "Utils.h" #include "Log.h" #include "DabModulator.h" #include "InputMemory.h" #include "OutputFile.h" #include "FormatConverter.h" #if defined(HAVE_OUTPUT_UHD) # include "OutputUHD.h" #endif #if defined(HAVE_SOAPYSDR) # include "OutputSoapy.h" #endif #include "OutputZeroMQ.h" #include "InputReader.h" #include "PcDebug.h" #include "TimestampDecoder.h" #include "FIRFilter.h" #include "RemoteControl.h" #include #include #include #include #include #include #include #include #include #include #include #include #if HAVE_NETINET_IN_H # include #endif #if HAVE_DECL__MM_MALLOC # include #else # define memalign(a, b) malloc(b) #endif #define ZMQ_INPUT_MAX_FRAME_QUEUE 500 typedef std::complex complexf; using namespace std; volatile sig_atomic_t running = 1; void signalHandler(int signalNb) { PDEBUG("signalHandler(%i)\n", signalNb); running = 0; } struct modulator_data { modulator_data() : inputReader(nullptr), framecount(0), flowgraph(nullptr), etiReader(nullptr), rcs(nullptr) {} InputReader* inputReader; Buffer data; uint64_t framecount; Flowgraph* flowgraph; EtiReader* etiReader; RemoteControllers* rcs; }; enum class run_modulator_state_t { failure, // Corresponds to all failures normal_end, // Number of frames to modulate was reached again, // ZeroMQ overrun reconfigure // Some sort of change of configuration we cannot handle happened }; run_modulator_state_t run_modulator(modulator_data& m); static GainMode parse_gainmode(const std::string &gainMode_setting) { string gainMode_minuscule(gainMode_setting); std::transform(gainMode_minuscule.begin(), gainMode_minuscule.end(), gainMode_minuscule.begin(), ::tolower); if (gainMode_minuscule == "0" or gainMode_minuscule == "fix") { return GainMode::GAIN_FIX; } else if (gainMode_minuscule == "1" or gainMode_minuscule == "max") { return GainMode::GAIN_MAX; } else if (gainMode_minuscule == "2" or gainMode_minuscule == "var") { return GainMode::GAIN_VAR; } cerr << "Modulator gainmode setting '" << gainMode_setting << "' not recognised." << endl; throw std::runtime_error("Configuration error"); } int launch_modulator(int argc, char* argv[]) { int ret = 0; bool loop = false; std::string inputName = ""; std::string inputTransport = "file"; unsigned inputMaxFramesQueued = ZMQ_INPUT_MAX_FRAME_QUEUE; float edi_max_delay_ms = 0.0f; std::string outputName; int useZeroMQOutput = 0; std::string zmqOutputSocketType = ""; int useFileOutput = 0; std::string fileOutputFormat = "complexf"; int useUHDOutput = 0; int useSoapyOutput = 0; size_t outputRate = 2048000; size_t clockRate = 0; unsigned dabMode = 0; float digitalgain = 1.0f; float normalise = 1.0f; GainMode gainMode = GainMode::GAIN_VAR; tii_config_t tiiConfig; /* UHD requires the input I and Q samples to be in the interval * [-1.0,1.0], otherwise they get truncated, which creates very * wide-spectrum spikes. Depending on the Transmission Mode, the * Gain Mode and the sample rate (and maybe other parameters), the * samples can have peaks up to about 48000. The value of 50000 * should guarantee that with a digital gain of 1.0, UHD never clips * our samples. */ const float normalise_factor = 50000.0f; std::string filterTapsFilename = ""; // Two configuration sources exist: command line and (new) INI file bool use_configuration_cmdline = false; bool use_configuration_file = false; std::string configuration_file; #if defined(HAVE_OUTPUT_UHD) OutputUHDConfig outputuhd_conf; #endif #if defined(HAVE_SOAPYSDR) OutputSoapyConfig outputsoapy_conf; #endif modulator_data m; // To handle the timestamp offset of the modulator unsigned tist_delay_stages = 0; double tist_offset_s = 0.0; auto flowgraph = make_shared(); shared_ptr format_converter; shared_ptr output; m.rcs = &rcs; bool run_again = true; InputFileReader inputFileReader; #if defined(HAVE_ZEROMQ) auto inputZeroMQReader = make_shared(); #endif auto inputTcpReader = make_shared(); struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = &signalHandler; if (sigaction(SIGINT, &sa, NULL) == -1) { perror("sigaction"); return EXIT_FAILURE; } // Set timezone to UTC setenv("TZ", "", 1); tzset(); while (true) { int c = getopt(argc, argv, "a:C:c:f:F:g:G:hlm:o:O:r:T:u:V"); if (c == -1) { break; } if (c != 'C') { use_configuration_cmdline = true; } switch (c) { case 'C': use_configuration_file = true; configuration_file = optarg; break; case 'a': digitalgain = strtof(optarg, NULL); break; case 'c': clockRate = strtol(optarg, NULL, 0); break; case 'f': #if defined(HAVE_OUTPUT_UHD) if (useUHDOutput) { fprintf(stderr, "Options -u and -f are mutually exclusive\n"); throw std::invalid_argument("Invalid command line options"); } #endif outputName = optarg; useFileOutput = 1; break; case 'F': #if defined(HAVE_OUTPUT_UHD) outputuhd_conf.frequency = strtof(optarg, NULL); #endif break; case 'g': gainMode = parse_gainmode(optarg); break; case 'G': #if defined(HAVE_OUTPUT_UHD) outputuhd_conf.txgain = strtod(optarg, NULL); #endif break; case 'l': loop = true; break; case 'o': tist_offset_s = strtod(optarg, NULL); #if defined(HAVE_OUTPUT_UHD) outputuhd_conf.enableSync = true; #endif break; case 'm': dabMode = strtol(optarg, NULL, 0); break; case 'r': outputRate = strtol(optarg, NULL, 0); break; case 'T': filterTapsFilename = optarg; break; case 'u': #if defined(HAVE_OUTPUT_UHD) if (useFileOutput) { fprintf(stderr, "Options -u and -f are mutually exclusive\n"); throw std::invalid_argument("Invalid command line options"); } outputuhd_conf.device = optarg; outputuhd_conf.refclk_src = "internal"; outputuhd_conf.pps_src = "none"; outputuhd_conf.pps_polarity = "pos"; useUHDOutput = 1; #endif break; case 'V': printVersion(); throw std::invalid_argument(""); break; case '?': case 'h': printUsage(argv[0]); throw std::invalid_argument(""); break; default: fprintf(stderr, "Option '%c' not coded yet!\n", c); ret = -1; throw std::invalid_argument("Invalid command line options"); } } std::cerr << "ODR-DabMod version " << #if defined(GITVERSION) GITVERSION #else VERSION #endif << std::endl; std::cerr << "Compiled with features: " << #if defined(HAVE_ZEROMQ) "zeromq " << #endif #if defined(HAVE_OUTPUT_UHD) "output_uhd " << #endif #if defined(HAVE_SOAPYSDR) "output_soapysdr " << #endif #if defined(__FAST_MATH__) "fast-math" << #endif "\n"; if (use_configuration_file && use_configuration_cmdline) { fprintf(stderr, "Warning: configuration file and command line parameters are defined:\n\t" "Command line parameters override settings in the configuration file !\n"); } // No argument given ? You can't be serious ! Show usage. if (argc == 1) { printUsage(argv[0]); throw std::invalid_argument("Invalid command line options"); } // If only one argument is given, interpret as configuration file name if (argc == 2) { use_configuration_file = true; configuration_file = argv[1]; } if (use_configuration_file) { // First read parameters from the file using boost::property_tree::ptree; ptree pt; try { read_ini(configuration_file, pt); } catch (boost::property_tree::ini_parser::ini_parser_error &e) { std::cerr << "Error, cannot read configuration file '" << configuration_file.c_str() << "'" << std::endl; std::cerr << " " << e.what() << std::endl; throw std::runtime_error("Cannot read configuration file"); } // remote controller: if (pt.get("remotecontrol.telnet", 0) == 1) { try { int telnetport = pt.get("remotecontrol.telnetport"); auto telnetrc = make_shared(telnetport); rcs.add_controller(telnetrc); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " telnet remote control enabled, but no telnetport defined.\n"; throw std::runtime_error("Configuration error"); } } #if defined(HAVE_ZEROMQ) if (pt.get("remotecontrol.zmqctrl", 0) == 1) { try { std::string zmqCtrlEndpoint = pt.get("remotecontrol.zmqctrlendpoint", ""); std::cerr << "ZmqCtrlEndpoint: " << zmqCtrlEndpoint << std::endl; auto zmqrc = make_shared(zmqCtrlEndpoint); rcs.add_controller(zmqrc); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " zmq remote control enabled, but no endpoint defined.\n"; throw std::runtime_error("Configuration error"); } } #endif // input params: if (pt.get("input.loop", 0) == 1) { loop = true; } inputTransport = pt.get("input.transport", "file"); inputMaxFramesQueued = pt.get("input.max_frames_queued", ZMQ_INPUT_MAX_FRAME_QUEUE); edi_max_delay_ms = pt.get("input.edi_max_delay", 0.0f); inputName = pt.get("input.source", "/dev/stdin"); // log parameters: if (pt.get("log.syslog", 0) == 1) { LogToSyslog* log_syslog = new LogToSyslog(); etiLog.register_backend(log_syslog); } if (pt.get("log.filelog", 0) == 1) { std::string logfilename; try { logfilename = pt.get("log.filename"); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration enables file log, but does not specify log filename\n"; throw std::runtime_error("Configuration error"); } LogToFile* log_file = new LogToFile(logfilename); etiLog.register_backend(log_file); } auto trace_filename = pt.get("log.trace", ""); if (not trace_filename.empty()) { LogTracer* tracer = new LogTracer(trace_filename); etiLog.register_backend(tracer); } // modulator parameters: const string gainMode_setting = pt.get("modulator.gainmode", "var"); gainMode = parse_gainmode(gainMode_setting); dabMode = pt.get("modulator.mode", dabMode); clockRate = pt.get("modulator.dac_clk_rate", (size_t)0); digitalgain = pt.get("modulator.digital_gain", digitalgain); outputRate = pt.get("modulator.rate", outputRate); // FIR Filter parameters: if (pt.get("firfilter.enabled", 0) == 1) { filterTapsFilename = pt.get("firfilter.filtertapsfile", "default"); } // Output options std::string output_selected; try { output_selected = pt.get("output.output"); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration does not specify output\n"; throw std::runtime_error("Configuration error"); } if (output_selected == "file") { try { outputName = pt.get("fileoutput.filename"); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << " Configuration does not specify file name for file output\n"; throw std::runtime_error("Configuration error"); } useFileOutput = 1; fileOutputFormat = pt.get("fileoutput.format", fileOutputFormat); } #if defined(HAVE_OUTPUT_UHD) else if (output_selected == "uhd") { outputuhd_conf.device = pt.get("uhdoutput.device", ""); outputuhd_conf.usrpType = pt.get("uhdoutput.type", ""); outputuhd_conf.subDevice = pt.get("uhdoutput.subdevice", ""); outputuhd_conf.masterClockRate = pt.get("uhdoutput.master_clock_rate", 0); if (outputuhd_conf.device.find("master_clock_rate") != std::string::npos) { std::cerr << "Warning:" "setting master_clock_rate in [uhd] device is deprecated !\n"; } if (outputuhd_conf.device.find("type=") != std::string::npos) { std::cerr << "Warning:" "setting type in [uhd] device is deprecated !\n"; } outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0); outputuhd_conf.frequency = pt.get("uhdoutput.frequency", 0); std::string chan = pt.get("uhdoutput.channel", ""); outputuhd_conf.dabMode = dabMode; if (outputuhd_conf.frequency == 0 && chan == "") { std::cerr << " UHD output enabled, but neither frequency nor channel defined.\n"; throw std::runtime_error("Configuration error"); } else if (outputuhd_conf.frequency == 0) { outputuhd_conf.frequency = parseChannel(chan); } else if (outputuhd_conf.frequency != 0 && chan != "") { std::cerr << " UHD output: cannot define both frequency and channel.\n"; throw std::runtime_error("Configuration error"); } outputuhd_conf.lo_offset = pt.get("uhdoutput.lo_offset", 0); outputuhd_conf.refclk_src = pt.get("uhdoutput.refclk_source", "internal"); outputuhd_conf.pps_src = pt.get("uhdoutput.pps_source", "none"); outputuhd_conf.pps_polarity = pt.get("uhdoutput.pps_polarity", "pos"); std::string behave = pt.get("uhdoutput.behaviour_refclk_lock_lost", "ignore"); if (behave == "crash") { outputuhd_conf.refclk_lock_loss_behaviour = CRASH; } else if (behave == "ignore") { outputuhd_conf.refclk_lock_loss_behaviour = IGNORE; } else { std::cerr << "Error: UHD output: behaviour_refclk_lock_lost invalid." << std::endl; throw std::runtime_error("Configuration error"); } outputuhd_conf.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0); useUHDOutput = 1; } #endif #if defined(HAVE_SOAPYSDR) else if (output_selected == "soapysdr") { outputsoapy_conf.device = pt.get("soapyoutput.device", ""); outputsoapy_conf.masterClockRate = pt.get("soapyoutput.master_clock_rate", 0); outputsoapy_conf.txgain = pt.get("soapyoutput.txgain", 0.0); outputsoapy_conf.frequency = pt.get("soapyoutput.frequency", 0); std::string chan = pt.get("soapyoutput.channel", ""); outputsoapy_conf.dabMode = dabMode; if (outputsoapy_conf.frequency == 0 && chan == "") { std::cerr << " soapy output enabled, but neither frequency nor channel defined.\n"; throw std::runtime_error("Configuration error"); } else if (outputsoapy_conf.frequency == 0) { outputsoapy_conf.frequency = parseChannel(chan); } else if (outputsoapy_conf.frequency != 0 && chan != "") { std::cerr << " soapy output: cannot define both frequency and channel.\n"; throw std::runtime_error("Configuration error"); } useSoapyOutput = 1; } #endif #if defined(HAVE_ZEROMQ) else if (output_selected == "zmq") { outputName = pt.get("zmqoutput.listen"); zmqOutputSocketType = pt.get("zmqoutput.socket_type"); useZeroMQOutput = 1; } #endif else { std::cerr << "Error: Invalid output defined.\n"; throw std::runtime_error("Configuration error"); } #if defined(HAVE_OUTPUT_UHD) outputuhd_conf.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1); if (outputuhd_conf.enableSync) { std::string delay_mgmt = pt.get("delaymanagement.management", ""); std::string fixedoffset = pt.get("delaymanagement.fixedoffset", ""); std::string offset_filename = pt.get("delaymanagement.dynamicoffsetfile", ""); if (not(delay_mgmt.empty() and fixedoffset.empty() and offset_filename.empty())) { std::cerr << "Warning: you are using the old config syntax for the offset management.\n"; std::cerr << " Please see the example.ini configuration for the new settings.\n"; } try { tist_offset_s = pt.get("delaymanagement.offset"); } catch (std::exception &e) { std::cerr << "Error: delaymanagement: synchronous is enabled, but no offset defined!\n"; throw std::runtime_error("Configuration error"); } } outputuhd_conf.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1); #endif /* Read TII parameters from config file */ tiiConfig.enable = pt.get("tii.enable", 0); tiiConfig.comb = pt.get("tii.comb", 0); tiiConfig.pattern = pt.get("tii.pattern", 0); tiiConfig.old_variant = pt.get("tii.old_variant", 0); } etiLog.level(info) << "Starting up version " << #if defined(GITVERSION) GITVERSION; #else VERSION; #endif // When using the FIRFilter, increase the modulator offset pipelining delay // by the correct amount if (not filterTapsFilename.empty()) { tist_delay_stages += FIRFILTER_PIPELINE_DELAY; } // Setting ETI input filename if (use_configuration_cmdline && inputName == "") { if (optind < argc) { inputName = argv[optind++]; if (inputName.substr(0, 4) == "zmq+" && inputName.find("://") != std::string::npos) { // if the name starts with zmq+XYZ://somewhere:port inputTransport = "zeromq"; } else if (inputName.substr(0, 6) == "tcp://") { inputTransport = "tcp"; } else if (inputName.substr(0, 6) == "udp://") { inputTransport = "edi"; } } else { inputName = "/dev/stdin"; } } // Checking unused arguments if (use_configuration_cmdline && optind != argc) { fprintf(stderr, "Invalid arguments:"); while (optind != argc) { fprintf(stderr, " %s", argv[optind++]); } fprintf(stderr, "\n"); printUsage(argv[0]); ret = -1; etiLog.level(error) << "Received invalid command line arguments"; throw std::invalid_argument("Invalid command line options"); } if (!useFileOutput && !useUHDOutput && !useZeroMQOutput && !useSoapyOutput) { etiLog.level(error) << "Output not specified"; fprintf(stderr, "Must specify output !"); throw std::runtime_error("Configuration error"); } // Print settings fprintf(stderr, "Input\n"); fprintf(stderr, " Type: %s\n", inputTransport.c_str()); fprintf(stderr, " Source: %s\n", inputName.c_str()); fprintf(stderr, "Output\n"); if (useFileOutput) { fprintf(stderr, " Name: %s\n", outputName.c_str()); } #if defined(HAVE_OUTPUT_UHD) else if (useUHDOutput) { fprintf(stderr, " UHD\n" " Device: %s\n" " Type: %s\n" " master_clock_rate: %ld\n" " refclk: %s\n" " pps source: %s\n", outputuhd_conf.device.c_str(), outputuhd_conf.usrpType.c_str(), outputuhd_conf.masterClockRate, outputuhd_conf.refclk_src.c_str(), outputuhd_conf.pps_src.c_str()); } #endif #if defined(HAVE_SOAPYSDR) else if (useSoapyOutput) { fprintf(stderr, " SoapySDR\n" " Device: %s\n" " master_clock_rate: %ld\n", outputsoapy_conf.device.c_str(), outputsoapy_conf.masterClockRate); } #endif else if (useZeroMQOutput) { fprintf(stderr, " ZeroMQ\n" " Listening on: %s\n" " Socket type : %s\n", outputName.c_str(), zmqOutputSocketType.c_str()); } fprintf(stderr, " Sampling rate: "); if (outputRate > 1000) { if (outputRate > 1000000) { fprintf(stderr, "%.4g MHz\n", outputRate / 1000000.0f); } else { fprintf(stderr, "%.4g kHz\n", outputRate / 1000.0f); } } else { fprintf(stderr, "%zu Hz\n", outputRate); } EdiReader ediReader(tist_offset_s, tist_delay_stages); EdiDecoder::ETIDecoder ediInput(ediReader, false); if (edi_max_delay_ms > 0.0f) { // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames ediInput.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f)); } EdiUdpInput ediUdpInput(ediInput); if (inputTransport == "file") { // Opening ETI input file if (inputFileReader.Open(inputName, loop) == -1) { fprintf(stderr, "Unable to open input file!\n"); etiLog.level(error) << "Unable to open input file!"; ret = -1; throw std::runtime_error("Unable to open input"); } m.inputReader = &inputFileReader; } else if (inputTransport == "zeromq") { #if !defined(HAVE_ZEROMQ) fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n"); ret = -1; throw std::runtime_error("Unable to open input"); #else inputZeroMQReader->Open(inputName, inputMaxFramesQueued); m.inputReader = inputZeroMQReader.get(); #endif } else if (inputTransport == "tcp") { inputTcpReader->Open(inputName); m.inputReader = inputTcpReader.get(); } else if (inputTransport == "edi") { ediUdpInput.Open(inputName); } else { fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); ret = -1; throw std::runtime_error("Unable to open input"); } if (useFileOutput) { if (fileOutputFormat == "complexf") { output = make_shared(outputName); } else if (fileOutputFormat == "s8") { // We must normalise the samples to the interval [-127.0; 127.0] normalise = 127.0f / normalise_factor; format_converter = make_shared(); output = make_shared(outputName); } } #if defined(HAVE_OUTPUT_UHD) else if (useUHDOutput) { normalise = 1.0f / normalise_factor; outputuhd_conf.sampleRate = outputRate; output = make_shared(outputuhd_conf); rcs.enrol((OutputUHD*)output.get()); } #endif #if defined(HAVE_SOAPYSDR) else if (useSoapyOutput) { /* We normalise the same way as for the UHD output */ normalise = 1.0f / normalise_factor; outputsoapy_conf.sampleRate = outputRate; output = make_shared(outputsoapy_conf); rcs.enrol((OutputSoapy*)output.get()); } #endif #if defined(HAVE_ZEROMQ) else if (useZeroMQOutput) { /* We normalise the same way as for the UHD output */ normalise = 1.0f / normalise_factor; if (zmqOutputSocketType == "pub") { output = make_shared(outputName, ZMQ_PUB); } else if (zmqOutputSocketType == "rep") { output = make_shared(outputName, ZMQ_REP); } else { std::stringstream ss; ss << "ZeroMQ output socket type " << zmqOutputSocketType << " invalid"; throw std::invalid_argument(ss.str()); } } #endif // Set thread priority to realtime if (int r = set_realtime_prio(1)) { etiLog.level(error) << "Could not set priority for modulator:" << r; } set_thread_name("modulator"); if (inputTransport == "edi") { if (not ediUdpInput.isEnabled()) { etiLog.level(error) << "inputTransport is edi, but ediUdpInput is not enabled"; return -1; } Flowgraph flowgraph; auto modulator = make_shared( ediReader, tiiConfig, outputRate, clockRate, dabMode, gainMode, digitalgain, normalise, filterTapsFilename); if (format_converter) { flowgraph.connect(modulator, format_converter); flowgraph.connect(format_converter, output); } else { flowgraph.connect(modulator, output); } #if defined(HAVE_OUTPUT_UHD) if (useUHDOutput) { ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource()); } #endif #if defined(HAVE_SOAPYSDR) if (useSoapyOutput) { ((OutputSoapy*)output.get())->setETISource(modulator->getEtiSource()); } #endif size_t framecount = 0; while (running) { while (not ediReader.isFrameReady()) { bool success = ediUdpInput.rxPacket(); if (not success) { running = false; break; } } framecount++; flowgraph.run(); ediReader.clearFrame(); /* Check every once in a while if the remote control * is still working */ if ((framecount % 250) == 0) { rcs.check_faults(); } } } else { while (run_again) { Flowgraph flowgraph; m.flowgraph = &flowgraph; m.data.setLength(6144); EtiReader etiReader(tist_offset_s, tist_delay_stages); m.etiReader = &etiReader; auto input = make_shared(&m.data); auto modulator = make_shared( etiReader, tiiConfig, outputRate, clockRate, dabMode, gainMode, digitalgain, normalise, filterTapsFilename); if (format_converter) { flowgraph.connect(modulator, format_converter); flowgraph.connect(format_converter, output); } else { flowgraph.connect(modulator, output); } #if defined(HAVE_OUTPUT_UHD) if (useUHDOutput) { ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource()); } #endif #if defined(HAVE_SOAPYSDR) if (useSoapyOutput) { ((OutputSoapy*)output.get())->setETISource(modulator->getEtiSource()); } #endif m.inputReader->PrintInfo(); run_modulator_state_t st = run_modulator(m); etiLog.log(trace, "DABMOD,run_modulator() = %d", st); switch (st) { case run_modulator_state_t::failure: etiLog.level(error) << "Modulator failure."; run_again = false; ret = 1; break; case run_modulator_state_t::again: etiLog.level(warn) << "Restart modulator."; run_again = false; if (inputTransport == "file") { if (inputFileReader.Open(inputName, loop) == -1) { etiLog.level(error) << "Unable to open input file!"; ret = 1; } else { run_again = true; } } else if (inputTransport == "zeromq") { #if defined(HAVE_ZEROMQ) run_again = true; // Create a new input reader inputZeroMQReader = make_shared(); inputZeroMQReader->Open(inputName, inputMaxFramesQueued); m.inputReader = inputZeroMQReader.get(); #endif } else if (inputTransport == "tcp") { inputTcpReader = make_shared(); inputTcpReader->Open(inputName); m.inputReader = inputTcpReader.get(); } break; case run_modulator_state_t::reconfigure: etiLog.level(warn) << "Detected change in ensemble configuration."; /* We can keep the input in this care */ run_again = true; break; case run_modulator_state_t::normal_end: default: etiLog.level(info) << "modulator stopped."; ret = 0; run_again = false; break; } fprintf(stderr, "\n\n"); etiLog.level(info) << m.framecount << " DAB frames encoded"; etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; m.data.setLength(0); } } etiLog.level(info) << "Terminating"; return ret; } run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { while (running) { int framesize; PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { if (!running) { break; } m.framecount++; PDEBUG("*****************************************\n"); PDEBUG("* Read frame %lu\n", m.framecount); PDEBUG("*****************************************\n"); const int eti_bytes_read = m.etiReader->loadEtiData(m.data); if ((size_t)eti_bytes_read != m.data.getLength()) { etiLog.level(error) << "ETI frame incompletely read"; throw std::runtime_error("ETI read error"); } m.flowgraph->run(); /* Check every once in a while if the remote control * is still working */ if ((m.framecount % 250) == 0) { rcs.check_faults(); } } if (framesize == 0) { etiLog.level(info) << "End of file reached."; } else { etiLog.level(error) << "Input read error."; } running = 0; ret = run_modulator_state_t::normal_end; } } catch (zmq_input_overflow& e) { // The ZeroMQ input has overflowed its buffer etiLog.level(warn) << e.what(); ret = run_modulator_state_t::again; } catch (std::out_of_range& e) { // One of the DSP blocks has detected an invalid change // or value in some settings. This can be due to a multiplex // reconfiguration. etiLog.level(warn) << e.what(); ret = run_modulator_state_t::reconfigure; } catch (std::exception& e) { etiLog.level(error) << "Exception caught: " << e.what(); ret = run_modulator_state_t::failure; } return ret; } int main(int argc, char* argv[]) { try { return launch_modulator(argc, argv); } catch (std::invalid_argument& e) { std::string what(e.what()); if (not what.empty()) { std::cerr << "Modulator error: " << what << std::endl; } } catch (std::runtime_error& e) { std::cerr << "Modulator runtime error: " << e.what() << std::endl; } }