diff options
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r-- | src/DabMod.cpp | 447 |
1 files changed, 246 insertions, 201 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index f97c05d..e0e8a5b 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2023 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -25,6 +25,7 @@ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. */ +#include <fftw3.h> #ifdef HAVE_CONFIG_H # include "config.h" #endif @@ -46,6 +47,7 @@ # include <netinet/in.h> #endif +#include "Events.h" #include "Utils.h" #include "Log.h" #include "DabModulator.h" @@ -56,6 +58,7 @@ #include "output/SDR.h" #include "output/UHD.h" #include "output/Soapy.h" +#include "output/Dexter.h" #include "output/Lime.h" #include "output/BladeRF.h" #include "OutputZeroMQ.h" @@ -93,18 +96,147 @@ void signalHandler(int signalNb) running = 0; } -struct modulator_data -{ - // For ETI - std::shared_ptr<InputReader> inputReader; - std::shared_ptr<EtiReader> etiReader; +class ModulatorData : public RemoteControllable { + public: + // For ETI + std::shared_ptr<InputReader> inputReader; + std::shared_ptr<EtiReader> etiReader; + + // For EDI + std::shared_ptr<EdiInput> ediInput; + + // Common to both EDI and EDI + uint64_t framecount = 0; + Flowgraph *flowgraph = nullptr; + + + // RC-related + ModulatorData() : RemoteControllable("mainloop") { + RC_ADD_PARAMETER(num_modulator_restarts, "(Read-only) Number of mod restarts"); + RC_ADD_PARAMETER(most_recent_edi_decoded, "(Read-only) UNIX Timestamp of most recently decoded EDI frame"); + RC_ADD_PARAMETER(edi_source, "(Read-only) URL of the EDI/TCP source"); + RC_ADD_PARAMETER(running_since, "(Read-only) UNIX Timestamp of most recent modulator restart"); + RC_ADD_PARAMETER(ensemble_label, "(Read-only) Label of the ensemble"); + RC_ADD_PARAMETER(ensemble_eid, "(Read-only) Ensemble ID"); + RC_ADD_PARAMETER(ensemble_services, "(Read-only, only JSON) Ensemble service information"); + RC_ADD_PARAMETER(num_services, "(Read-only) Number of services in the ensemble"); + } - // For EDI - std::shared_ptr<EdiInput> ediInput; + virtual ~ModulatorData() {} + + virtual void set_parameter(const std::string& parameter, const std::string& value) { + throw ParameterError("Parameter " + parameter + " is read-only"); + } - // Common to both EDI and EDI - uint64_t framecount = 0; - Flowgraph *flowgraph = nullptr; + virtual const std::string get_parameter(const std::string& parameter) const { + stringstream ss; + if (parameter == "num_modulator_restarts") { + ss << num_modulator_restarts; + } + if (parameter == "running_since") { + ss << running_since; + } + else if (parameter == "most_recent_edi_decoded") { + ss << most_recent_edi_decoded; + } + else if (parameter == "ensemble_label") { + if (ediInput) { + const auto ens = ediInput->ediReader.getEnsembleInfo(); + if (ens) { + ss << FICDecoder::ConvertLabelToUTF8(ens->label, nullptr); + } + else { + throw ParameterError("Not available yet"); + } + } + else { + throw ParameterError("Not available yet"); + } + } + else if (parameter == "ensemble_eid") { + if (ediInput) { + const auto ens = ediInput->ediReader.getEnsembleInfo(); + if (ens) { + ss << ens->eid; + } + else { + throw ParameterError("Not available yet"); + } + } + else { + throw ParameterError("Not available yet"); + } + } + else if (parameter == "edi_source") { + if (ediInput) { + ss << ediInput->ediTransport.getTcpUri(); + } + else { + throw ParameterError("Not available yet"); + } + } + else if (parameter == "num_services") { + if (ediInput) { + ss << ediInput->ediReader.getSubchannels().size(); + } + else { + throw ParameterError("Not available yet"); + } + } + else if (parameter == "ensemble_services") { + throw ParameterError("ensemble_services is only available through 'showjson'"); + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); + } + + virtual const json::map_t get_all_values() const + { + json::map_t map; + map["num_modulator_restarts"].v = num_modulator_restarts; + map["running_since"].v = running_since; + map["most_recent_edi_decoded"].v = most_recent_edi_decoded; + + if (ediInput) { + map["edi_source"].v = ediInput->ediTransport.getTcpUri(); + map["num_services"].v = ediInput->ediReader.getSubchannels().size(); + + const auto ens = ediInput->ediReader.getEnsembleInfo(); + if (ens) { + map["ensemble_label"].v = FICDecoder::ConvertLabelToUTF8(ens->label, nullptr); + map["ensemble_eid"].v = ens->eid; + } + else { + map["ensemble_label"].v = nullopt; + map["ensemble_eid"].v = nullopt; + } + + std::vector<json::value_t> services; + + for (const auto& s : ediInput->ediReader.getServiceInfo()) { + auto service_map = make_shared<json::map_t>(); + (*service_map)["sad"].v = s.second.subchannel.start; + (*service_map)["sid"].v = s.second.sid; + (*service_map)["label"].v = FICDecoder::ConvertLabelToUTF8(s.second.label, nullptr); + (*service_map)["bitrate"].v = s.second.subchannel.bitrate; + json::value_t v; + v.v = service_map; + services.push_back(v); + } + + map["ensemble_services"].v = services; + + } + return map; + } + + size_t num_modulator_restarts = 0; + time_t most_recent_edi_decoded = 0; + time_t running_since = 0; }; enum class run_modulator_state_t { @@ -114,86 +246,10 @@ enum class run_modulator_state_t { reconfigure // Some sort of change of configuration we cannot handle happened }; -static run_modulator_state_t run_modulator(modulator_data& m); +static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, ModulatorData& m); -static void printModSettings(const mod_settings_t& mod_settings) -{ - stringstream ss; - // Print settings - ss << "Input\n"; - ss << " Type: " << mod_settings.inputTransport << "\n"; - ss << " Source: " << mod_settings.inputName << "\n"; - - ss << "Output\n"; - if (mod_settings.useFileOutput) { - ss << " Name: " << mod_settings.outputName << "\n"; - } -#if defined(HAVE_OUTPUT_UHD) - else if (mod_settings.useUHDOutput) { - ss << " UHD\n" << - " Device: " << mod_settings.sdr_device_config.device << "\n" << - " Subdevice: " << - mod_settings.sdr_device_config.subDevice << "\n" << - " master_clock_rate: " << - mod_settings.sdr_device_config.masterClockRate << "\n" << - " refclk: " << - mod_settings.sdr_device_config.refclk_src << "\n" << - " pps source: " << - mod_settings.sdr_device_config.pps_src << "\n"; - } -#endif -#if defined(HAVE_SOAPYSDR) - else if (mod_settings.useSoapyOutput) { - ss << " SoapySDR\n" - " Device: " << mod_settings.sdr_device_config.device << "\n" << - " master_clock_rate: " << - mod_settings.sdr_device_config.masterClockRate << "\n"; - } -#endif -#if defined(HAVE_LIMESDR) - else if (mod_settings.useLimeOutput) { - ss << " LimeSDR\n" - " Device: " << mod_settings.sdr_device_config.device << "\n" << - " master_clock_rate: " << - mod_settings.sdr_device_config.masterClockRate << "\n"; - } -#endif -#if defined(HAVE_BLADERF) - else if (mod_settings.useBladeRFOutput) { - ss << " BladeRF\n" - " Device: " << mod_settings.sdr_device_config.device << "\n" << - " refclk: " << mod_settings.sdr_device_config.refclk_src << "\n"; - } -#endif - else if (mod_settings.useZeroMQOutput) { - ss << " ZeroMQ\n" << - " Listening on: " << mod_settings.outputName << "\n" << - " Socket type : " << mod_settings.zmqOutputSocketType << "\n"; - } - - ss << " Sampling rate: "; - if (mod_settings.outputRate > 1000) { - if (mod_settings.outputRate > 1000000) { - ss << std::fixed << std::setprecision(4) << - mod_settings.outputRate / 1000000.0 << - " MHz\n"; - } - else { - ss << std::fixed << std::setprecision(4) << - mod_settings.outputRate / 1000.0 << - " kHz\n"; - } - } - else { - ss << std::fixed << std::setprecision(4) << - mod_settings.outputRate << " Hz\n"; - } - fprintf(stderr, "%s", ss.str().c_str()); -} - -static shared_ptr<ModOutput> prepare_output( - mod_settings_t& s) +static shared_ptr<ModOutput> prepare_output(mod_settings_t& s) { shared_ptr<ModOutput> output; @@ -249,6 +305,16 @@ static shared_ptr<ModOutput> prepare_output( rcs.enrol((Output::SDR*)output.get()); } #endif +#if defined(HAVE_DEXTER) + else if (s.useDexterOutput) { + /* We normalise specifically range [-32768; 32767] */ + s.normalise = 32767.0f / normalise_factor; + s.sdr_device_config.sampleRate = s.outputRate; + auto dexterdevice = make_shared<Output::Dexter>(s.sdr_device_config); + output = make_shared<Output::SDR>(s.sdr_device_config, dexterdevice); + rcs.enrol((Output::SDR*)output.get()); + } +#endif #if defined(HAVE_LIMESDR) else if (s.useLimeOutput) { /* We normalise the same way as for the UHD output */ @@ -308,6 +374,8 @@ int launch_modulator(int argc, char* argv[]) mod_settings_t mod_settings; parse_args(argc, argv, mod_settings); + etiLog.register_backend(make_shared<LogToEventSender>()); + etiLog.level(info) << "Configuration parsed. Starting up version " << #if defined(GITVERSION) GITVERSION; @@ -319,6 +387,7 @@ int launch_modulator(int argc, char* argv[]) mod_settings.useUHDOutput or mod_settings.useZeroMQOutput or mod_settings.useSoapyOutput or + mod_settings.useDexterOutput or mod_settings.useLimeOutput or mod_settings.useBladeRFOutput)) { throw std::runtime_error("Configuration error: Output not specified"); @@ -326,19 +395,49 @@ int launch_modulator(int argc, char* argv[]) printModSettings(mod_settings); - shared_ptr<FormatConverter> format_converter; + ModulatorData m; + rcs.enrol(&m); + + { + // This is mostly useful on ARM systems where FFTW planning takes some time. If we do it here + // it will be done before the modulator starts up + etiLog.level(debug) << "Running FFTW planning..."; + constexpr size_t fft_size = 2048; // Transmission Mode I. If different, it'll recalculate on OfdmGenerator + // initialisation + auto *fft_in = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size); + auto *fft_out = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size); + if (fft_in == nullptr or fft_out == nullptr) { + throw std::runtime_error("FFTW malloc failed"); + } + fftwf_set_timelimit(2); + fftwf_plan plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_FORWARD, FFTW_MEASURE); + fftwf_destroy_plan(plan); + plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_BACKWARD, FFTW_MEASURE); + fftwf_destroy_plan(plan); + fftwf_free(fft_in); + fftwf_free(fft_out); + etiLog.level(debug) << "FFTW planning done."; + } + + std::string output_format; if (mod_settings.useFileOutput and (mod_settings.fileOutputFormat == "s8" or mod_settings.fileOutputFormat == "u8" or mod_settings.fileOutputFormat == "s16")) { - format_converter = make_shared<FormatConverter>(mod_settings.fileOutputFormat); + output_format = mod_settings.fileOutputFormat; + } + else if (mod_settings.useBladeRFOutput or mod_settings.useDexterOutput) { + output_format = "s16"; } - else if (mod_settings.useBladeRFOutput) { - format_converter = make_shared<FormatConverter>(mod_settings.BladeRFOutputFormat); - } auto output = prepare_output(mod_settings); + if (not output_format.empty()) { + if (auto o = dynamic_pointer_cast<Output::SDR>(output)) { + o->set_sample_size(FormatConverter::get_format_size(output_format)); + } + } + // Set thread priority to realtime if (int r = set_realtime_prio(1)) { etiLog.level(error) << "Could not set priority for modulator:" << r; @@ -365,17 +464,6 @@ int launch_modulator(int argc, char* argv[]) inputReader = inputFileReader; } - else if (mod_settings.inputTransport == "zeromq") { -#if !defined(HAVE_ZEROMQ) - throw std::runtime_error("Unable to open input: " - "ZeroMQ input transport selected, but not compiled in!"); -#else - auto inputZeroMQReader = make_shared<InputZeroMQReader>(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; -#endif - } else if (mod_settings.inputTransport == "tcp") { auto inputTcpReader = make_shared<InputTcpReader>(); inputTcpReader->Open(mod_settings.inputName); @@ -386,40 +474,37 @@ int launch_modulator(int argc, char* argv[]) "invalid input transport " + mod_settings.inputTransport + " selected!"); } + m.ediInput = ediInput; + m.inputReader = inputReader; + bool run_again = true; while (run_again) { + m.running_since = get_clock_realtime_seconds(); + Flowgraph flowgraph(mod_settings.showProcessTime); - modulator_data m; - m.ediInput = ediInput; - m.inputReader = inputReader; + m.framecount = 0; m.flowgraph = &flowgraph; shared_ptr<DabModulator> modulator; if (inputReader) { m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s); - modulator = make_shared<DabModulator>(*m.etiReader, mod_settings); + modulator = make_shared<DabModulator>(*m.etiReader, mod_settings, output_format); } else if (ediInput) { - modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings); + modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings, output_format); } rcs.enrol(modulator.get()); - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); - } + flowgraph.connect(modulator, output); if (inputReader) { etiLog.level(info) << inputReader->GetPrintableInfo(); } - run_modulator_state_t st = run_modulator(m); + run_modulator_state_t st = run_modulator(mod_settings, m); etiLog.log(trace, "DABMOD,run_modulator() = %d", st); switch (st) { @@ -440,17 +525,6 @@ int launch_modulator(int argc, char* argv[]) run_again = true; } } -#if defined(HAVE_ZEROMQ) - else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { - run_again = true; - // Create a new input reader - rcs.remove_controllable(in_zmq.get()); - auto inputZeroMQReader = make_shared<InputZeroMQReader>(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; - } -#endif else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) { // Keep the same inputReader, as there is no input buffer overflow run_again = true; @@ -473,28 +547,21 @@ int launch_modulator(int argc, char* argv[]) break; } - etiLog.level(info) << m.framecount << " DAB frames encoded"; - etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; + etiLog.level(info) << m.framecount << " DAB frames, " << ((float)m.framecount * 0.024f) << " seconds encoded"; + m.num_modulator_restarts++; } etiLog.level(info) << "Terminating"; return ret; } -struct zmq_input_timeout : public std::exception -{ - const char* what() const throw() - { - return "InputZMQ timeout"; - } -}; - -static run_modulator_state_t run_modulator(modulator_data& m) +static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, ModulatorData& m) { auto ret = run_modulator_state_t::failure; try { int last_eti_fct = -1; auto last_frame_received = chrono::steady_clock::now(); + frame_timestamp ts; Buffer data; if (m.inputReader) { data.setLength(6144); @@ -515,36 +582,9 @@ static run_modulator_state_t run_modulator(modulator_data& m) ret = run_modulator_state_t::normal_end; break; } -#if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { - /* An empty frame marks a timeout. We ignore it, but we are - * now able to handle SIGINT properly. - * - * Also, we reconnect zmq every 10 seconds to avoid some - * issues, discussed in - * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection - * - * > It is possible that the PUB socket sees the error - * > while the SUB socket does not. - * > - * > The ZMTP RFC has a proposal for heartbeating that would - * > solve this problem. The current best solution is for - * > PUB sockets to send heartbeats (e.g. 1 per second) when - * > traffic is low, and for SUB sockets to disconnect / - * > reconnect if they stop getting these. - * - * We don't need a heartbeat, because our application is constant frame rate, - * the frames themselves can act as heartbeats. - */ - - const auto now = chrono::steady_clock::now(); - if (last_frame_received + chrono::seconds(10) < now) { - throw zmq_input_timeout(); - } - } -#endif // defined(HAVE_ZEROMQ) else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { - /* Same as for ZeroMQ */ + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. */ } else { throw logic_error("Unhandled framesize==0!"); @@ -568,6 +608,7 @@ static run_modulator_state_t run_modulator(modulator_data& m) fct = m.etiReader->getFct(); fp = m.etiReader->getFp(); + ts = m.etiReader->getTimestamp(); } else if (m.ediInput) { while (running and not m.ediInput->ediReader.isFrameReady()) { @@ -594,39 +635,53 @@ static run_modulator_state_t run_modulator(modulator_data& m) break; } + m.most_recent_edi_decoded = get_clock_realtime_seconds(); fct = m.ediInput->ediReader.getFct(); fp = m.ediInput->ediReader.getFp(); + ts = m.ediInput->ediReader.getTimestamp(); } - const unsigned expected_fct = (last_eti_fct + 1) % 250; - if (last_eti_fct == -1) { - if (fp != 0) { - // Do not start the flowgraph before we get to FP 0 - // to ensure all blocks are properly aligned. - if (m.ediInput) { - m.ediInput->ediReader.clearFrame(); + // timestamp is good if we run unsynchronised, or if margin is sufficient + bool ts_good = not mod_settings.sdr_device_config.enableSync or + (ts.timestamp_valid and ts.offset_to_system_time() > 0.2); + + if (!ts_good) { + etiLog.level(warn) << "Modulator skipping frame " << fct << + " TS " << (ts.timestamp_valid ? "valid" : "invalid") << + " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0); + } + else { + bool modulate = true; + if (last_eti_fct == -1) { + if (fp != 0) { + // Do not start the flowgraph before we get to FP 0 + // to ensure all blocks are properly aligned. + modulate = false; + } + else { + last_eti_fct = fct; } - continue; } else { - last_eti_fct = fct; + const unsigned expected_fct = (last_eti_fct + 1) % 250; + if (fct == expected_fct) { + last_eti_fct = fct; + } + else { + etiLog.level(warn) << "ETI FCT discontinuity, expected " << + expected_fct << " received " << fct; + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } + return run_modulator_state_t::again; + } + } + + if (modulate) { m.framecount++; m.flowgraph->run(); } } - else if (fct == expected_fct) { - last_eti_fct = fct; - m.framecount++; - m.flowgraph->run(); - } - else { - etiLog.level(info) << "ETI FCT discontinuity, expected " << - expected_fct << " received " << fct; - if (m.ediInput) { - m.ediInput->ediReader.clearFrame(); - } - return run_modulator_state_t::again; - } if (m.ediInput) { m.ediInput->ediReader.clearFrame(); @@ -639,16 +694,6 @@ static run_modulator_state_t run_modulator(modulator_data& m) } } } - catch (const zmq_input_timeout&) { - // The ZeroMQ input timeout - etiLog.level(warn) << "Timeout"; - ret = run_modulator_state_t::again; - } - catch (const zmq_input_overflow& e) { - // The ZeroMQ input has overflowed its buffer - etiLog.level(warn) << e.what(); - ret = run_modulator_state_t::again; - } catch (const FrameMultiplexerError& e) { // The FrameMultiplexer saw an error or a change in the size of a // subchannel. This can be due to a multiplex reconfiguration. |