diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-05 11:01:56 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-09-05 11:01:56 +0200 |
commit | 32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c (patch) | |
tree | 59cd09a0eaa6469ba7aed1b9b167cfc8643ec68f /src/DabMod.cpp | |
parent | 0c532ab20b47d661f098cc6089184303dce65d56 (diff) | |
download | dabmod-32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c.tar.gz dabmod-32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c.tar.bz2 dabmod-32f9e6e8bfa584f0dfb155c0bb7cdc843614af5c.zip |
Rework EDI input
- Use same main loop for both ETI and EDI inputs
- Test SFN functionality with EDI input
- Add log.show_process_time setting for process time printout
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r-- | src/DabMod.cpp | 480 |
1 files changed, 227 insertions, 253 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 81882e4..922f9e4 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -59,7 +59,6 @@ #include "OutputZeroMQ.h" #include "InputReader.h" #include "PcDebug.h" -#include "TimestampDecoder.h" #include "FIRFilter.h" #include "RemoteControl.h" #include "ConfigParser.h" @@ -94,12 +93,16 @@ void signalHandler(int signalNb) struct modulator_data { + // For ETI std::shared_ptr<InputReader> inputReader; - Buffer data; - uint64_t framecount = 0; + std::shared_ptr<EtiReader> etiReader; + + // For EDI + std::shared_ptr<EdiInput> ediInput; - Flowgraph* flowgraph = nullptr; - EtiReader* etiReader = nullptr; + // Common to both EDI and EDI + uint64_t framecount = 0; + Flowgraph *flowgraph = nullptr; }; enum class run_modulator_state_t { @@ -299,205 +302,137 @@ int launch_modulator(int argc, char* argv[]) etiLog.level(error) << "Could not set priority for modulator:" << r; } + shared_ptr<InputReader> inputReader; + shared_ptr<EdiInput> ediInput; + if (mod_settings.inputTransport == "edi") { -#ifdef HAVE_EDI - EdiReader ediReader(mod_settings.tist_offset_s); - EdiDecoder::ETIDecoder ediInput(ediReader, false); - if (mod_settings.edi_max_delay_ms > 0.0f) { - // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames - ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f)); - } - EdiTransport ediTransport(ediInput); + ediInput = make_shared<EdiInput>(mod_settings.tist_offset_s, mod_settings.edi_max_delay_ms); - ediTransport.Open(mod_settings.inputName); - if (not ediTransport.isEnabled()) { + ediInput->ediTransport.Open(mod_settings.inputName); + if (not ediInput->ediTransport.isEnabled()) { throw runtime_error("inputTransport is edi, but ediTransport is not enabled"); } - Flowgraph flowgraph; - - auto modulator = make_shared<DabModulator>(ediReader, mod_settings); - rcs.enrol(modulator.get()); + } + else if (mod_settings.inputTransport == "file") { + auto inputFileReader = make_shared<InputFileReader>(); - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); + // Opening ETI input file + if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) { + throw std::runtime_error("Unable to open input"); } - size_t framecount = 0; - - bool first_frame = true; - - auto frame_received_tp = chrono::steady_clock::now(); - - while (running) { - while (running and not ediReader.isFrameReady()) { - try { - bool packet_received = ediTransport.rxPacket(); - if (packet_received) { - frame_received_tp = chrono::steady_clock::now(); - } - } - catch (const std::runtime_error& e) { - etiLog.level(warn) << "EDI input: " << e.what(); - running = 0; - break; - } - - if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) { - etiLog.level(error) << "No EDI data received in 10 seconds."; - running = 0; - break; - } - } - - if (not running) { - break; - } - - if (first_frame) { - if (ediReader.getFp() != 0) { - // Do not start the flowgraph before we get to FP 0 - // to ensure all blocks are properly aligned. - ediReader.clearFrame(); - continue; - } - else { - first_frame = false; - } - } - - framecount++; - flowgraph.run(); - ediReader.clearFrame(); - - /* Check every once in a while if the remote control - * is still working */ - if ((framecount % 250) == 0) { - rcs.check_faults(); - } - } -#else + inputReader = inputFileReader; + } + else if (mod_settings.inputTransport == "zeromq") { +#if !defined(HAVE_ZEROMQ) throw std::runtime_error("Unable to open input: " - "EDI input transport selected, but not compiled in!"); -#endif // HAVE_EDI + "ZeroMQ input transport selected, but not compiled in!"); +#else + auto inputZeroMQReader = make_shared<InputZeroMQReader>(); + inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); + inputReader = inputZeroMQReader; +#endif + } + else if (mod_settings.inputTransport == "tcp") { + auto inputTcpReader = make_shared<InputTcpReader>(); + inputTcpReader->Open(mod_settings.inputName); + inputReader = inputTcpReader; } else { - shared_ptr<InputReader> inputReader; + throw std::runtime_error("Unable to open input: " + "invalid input transport " + mod_settings.inputTransport + " selected!"); + } - if (mod_settings.inputTransport == "file") { - auto inputFileReader = make_shared<InputFileReader>(); + bool run_again = true; - // Opening ETI input file - if (inputFileReader->Open(mod_settings.inputName, mod_settings.loop) == -1) { - throw std::runtime_error("Unable to open input"); - } + while (run_again) { + Flowgraph flowgraph(mod_settings.showProcessTime); - inputReader = inputFileReader; + modulator_data m; + m.ediInput = ediInput; + m.inputReader = inputReader; + m.flowgraph = &flowgraph; + + shared_ptr<DabModulator> modulator; + if (inputReader) { + m.etiReader = make_shared<EtiReader>(mod_settings.tist_offset_s); + modulator = make_shared<DabModulator>(*m.etiReader, mod_settings); } - else if (mod_settings.inputTransport == "zeromq") { -#if !defined(HAVE_ZEROMQ) - throw std::runtime_error("Unable to open input: " - "ZeroMQ input transport selected, but not compiled in!"); -#else - auto inputZeroMQReader = make_shared<InputZeroMQReader>(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; -#endif + else if (ediInput) { + modulator = make_shared<DabModulator>(ediInput->ediReader, mod_settings); } - else if (mod_settings.inputTransport == "tcp") { - auto inputTcpReader = make_shared<InputTcpReader>(); - inputTcpReader->Open(mod_settings.inputName); - inputReader = inputTcpReader; + + rcs.enrol(modulator.get()); + + if (format_converter) { + flowgraph.connect(modulator, format_converter); + flowgraph.connect(format_converter, output); } else { - throw std::runtime_error("Unable to open input: " - "invalid input transport " + mod_settings.inputTransport + " selected!"); + flowgraph.connect(modulator, output); } - bool run_again = true; - - while (run_again) { - Flowgraph flowgraph; - - modulator_data m; - m.inputReader = inputReader; - m.flowgraph = &flowgraph; - m.data.setLength(6144); - - EtiReader etiReader(mod_settings.tist_offset_s); - m.etiReader = &etiReader; - - auto input = make_shared<InputMemory>(&m.data); - auto modulator = make_shared<DabModulator>(etiReader, mod_settings); - rcs.enrol(modulator.get()); - - if (format_converter) { - flowgraph.connect(modulator, format_converter); - flowgraph.connect(format_converter, output); - } - else { - flowgraph.connect(modulator, output); - } - + if (inputReader) { etiLog.level(info) << inputReader->GetPrintableInfo(); + } - run_modulator_state_t st = run_modulator(m); - etiLog.log(trace, "DABMOD,run_modulator() = %d", st); - - switch (st) { - case run_modulator_state_t::failure: - etiLog.level(error) << "Modulator failure."; - run_again = false; - ret = 1; - break; - case run_modulator_state_t::again: - etiLog.level(warn) << "Restart modulator."; - run_again = false; - if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) { - if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) { - etiLog.level(error) << "Unable to open input file!"; - ret = 1; - } - else { - run_again = true; - } + run_modulator_state_t st = run_modulator(m); + etiLog.log(trace, "DABMOD,run_modulator() = %d", st); + + switch (st) { + case run_modulator_state_t::failure: + etiLog.level(error) << "Modulator failure."; + run_again = false; + ret = 1; + break; + case run_modulator_state_t::again: + etiLog.level(warn) << "Restart modulator."; + run_again = false; + if (auto in = dynamic_pointer_cast<InputFileReader>(inputReader)) { + if (in->Open(mod_settings.inputName, mod_settings.loop) == -1) { + etiLog.level(error) << "Unable to open input file!"; + ret = 1; } -#if defined(HAVE_ZEROMQ) - else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { + else { run_again = true; - // Create a new input reader - rcs.remove_controllable(in_zmq.get()); - auto inputZeroMQReader = make_shared<InputZeroMQReader>(); - inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); - rcs.enrol(inputZeroMQReader.get()); - inputReader = inputZeroMQReader; } + } +#if defined(HAVE_ZEROMQ) + else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { + run_again = true; + // Create a new input reader + rcs.remove_controllable(in_zmq.get()); + auto inputZeroMQReader = make_shared<InputZeroMQReader>(); + inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); + inputReader = inputZeroMQReader; + } #endif - else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) { - // Keep the same inputReader, as there is no input buffer overflow - run_again = true; - } - break; - case run_modulator_state_t::reconfigure: - etiLog.level(warn) << "Detected change in ensemble configuration."; - /* We can keep the input in this care */ + else if (dynamic_pointer_cast<InputTcpReader>(inputReader)) { + // Keep the same inputReader, as there is no input buffer overflow run_again = true; - break; - case run_modulator_state_t::normal_end: - default: - etiLog.level(info) << "modulator stopped."; - ret = 0; - run_again = false; - break; - } - - etiLog.level(info) << m.framecount << " DAB frames encoded"; - etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; + } + else if (ediInput) { + // In EDI, keep the same input + run_again = true; + } + break; + case run_modulator_state_t::reconfigure: + etiLog.level(warn) << "Detected change in ensemble configuration."; + /* We can keep the input in this case */ + run_again = true; + break; + case run_modulator_state_t::normal_end: + default: + etiLog.level(info) << "modulator stopped."; + ret = 0; + run_again = false; + break; } + + etiLog.level(info) << m.framecount << " DAB frames encoded"; + etiLog.level(info) << ((float)m.framecount * 0.024f) << " seconds encoded"; } etiLog.level(info) << "Terminating"; @@ -516,57 +451,142 @@ static run_modulator_state_t run_modulator(modulator_data& m) { auto ret = run_modulator_state_t::failure; try { - bool first_frame = true; int last_eti_fct = -1; auto last_frame_received = chrono::steady_clock::now(); + Buffer data; + if (m.inputReader) { + data.setLength(6144); + } while (running) { - int framesize; - - PDEBUG("*****************************************\n"); - PDEBUG("* Starting main loop\n"); - PDEBUG("*****************************************\n"); - while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) { - if (!running) { - break; - } + while (true) { + unsigned fct = 0; + unsigned fp = 0; + + /* Load ETI data from the source */ + if (m.inputReader) { + int framesize = m.inputReader->GetNextFrame(data.getData()); + + if (framesize == 0) { + if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { + etiLog.level(info) << "End of file reached."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } +#if defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { + /* An empty frame marks a timeout. We ignore it, but we are + * now able to handle SIGINT properly. + * + * Also, we reconnect zmq every 10 seconds to avoid some + * issues, discussed in + * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection + * + * > It is possible that the PUB socket sees the error + * > while the SUB socket does not. + * > + * > The ZMTP RFC has a proposal for heartbeating that would + * > solve this problem. The current best solution is for + * > PUB sockets to send heartbeats (e.g. 1 per second) when + * > traffic is low, and for SUB sockets to disconnect / + * > reconnect if they stop getting these. + * + * We don't need a heartbeat, because our application is constant frame rate, + * the frames themselves can act as heartbeats. + */ + + const auto now = chrono::steady_clock::now(); + if (last_frame_received + chrono::seconds(10) < now) { + throw zmq_input_timeout(); + } + } +#endif // defined(HAVE_ZEROMQ) + else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { + /* Same as for ZeroMQ */ + } + else { + throw logic_error("Unhandled framesize==0!"); + } + } + else if (framesize < 0) { + etiLog.level(error) << "Input read error."; + running = 0; + ret = run_modulator_state_t::normal_end; + break; + } + + const int eti_bytes_read = m.etiReader->loadEtiData(data); + if ((size_t)eti_bytes_read != data.getLength()) { + etiLog.level(error) << "ETI frame incompletely read"; + throw std::runtime_error("ETI read error"); + } - last_frame_received = chrono::steady_clock::now(); + fct = m.etiReader->getFct(); + fp = m.etiReader->getFp(); + } + else if (m.ediInput) { + while (running and not m.ediInput->ediReader.isFrameReady()) { + try { + bool packet_received = m.ediInput->ediTransport.rxPacket(); + if (packet_received) { + last_frame_received = chrono::steady_clock::now(); + } + } + catch (const std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); + running = 0; + break; + } - m.framecount++; + if (last_frame_received + chrono::seconds(10) < chrono::steady_clock::now()) { + etiLog.level(error) << "No EDI data received in 10 seconds."; + running = 0; + break; + } + } - PDEBUG("*****************************************\n"); - PDEBUG("* Read frame %lu\n", m.framecount); - PDEBUG("*****************************************\n"); + if (!running) { + break; + } - const int eti_bytes_read = m.etiReader->loadEtiData(m.data); - if ((size_t)eti_bytes_read != m.data.getLength()) { - etiLog.level(error) << "ETI frame incompletely read"; - throw std::runtime_error("ETI read error"); + fct = m.ediInput->ediReader.getFct(); + fp = m.ediInput->ediReader.getFp(); } - if (first_frame) { - if (m.etiReader->getFp() != 0) { + const unsigned expected_fct = (last_eti_fct + 1) % 250; + if (last_eti_fct == -1) { + if (fp != 0) { // Do not start the flowgraph before we get to FP 0 // to ensure all blocks are properly aligned. + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } continue; } else { - first_frame = false; + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); } } - - // Check for ETI FCT continuity - const unsigned expected_fct = (last_eti_fct + 1) % 250; - const unsigned fct = m.etiReader->getFct(); - if (last_eti_fct != -1 and expected_fct != fct) { + else if (fct == expected_fct) { + last_eti_fct = fct; + m.framecount++; + m.flowgraph->run(); + } + else { etiLog.level(info) << "ETI FCT discontinuity, expected " << - expected_fct << " received " << m.etiReader->getFct(); + expected_fct << " received " << fct; + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } return run_modulator_state_t::again; } - last_eti_fct = fct; - m.flowgraph->run(); + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } /* Check every once in a while if the remote control * is still working */ @@ -574,52 +594,6 @@ static run_modulator_state_t run_modulator(modulator_data& m) rcs.check_faults(); } } - if (framesize == 0) { - if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { - etiLog.level(info) << "End of file reached."; - running = 0; - ret = run_modulator_state_t::normal_end; - } -#if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { - /* An empty frame marks a timeout. We ignore it, but we are - * now able to handle SIGINT properly. - * - * Also, we reconnect zmq every 10 seconds to avoid some - * issues, discussed in - * https://stackoverflow.com/questions/26112992/zeromq-pub-sub-on-unreliable-connection - * - * > It is possible that the PUB socket sees the error - * > while the SUB socket does not. - * > - * > The ZMTP RFC has a proposal for heartbeating that would - * > solve this problem. The current best solution is for - * > PUB sockets to send heartbeats (e.g. 1 per second) when - * > traffic is low, and for SUB sockets to disconnect / - * > reconnect if they stop getting these. - * - * We don't need a heartbeat, because our application is constant frame rate, - * the frames themselves can act as heartbeats. - */ - - const auto now = chrono::steady_clock::now(); - if (last_frame_received + chrono::seconds(10) < now) { - throw zmq_input_timeout(); - } - } -#endif // defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputTcpReader>(m.inputReader)) { - /* Same as for ZeroMQ */ - } - else { - throw logic_error("Unhandled framesize==0!"); - } - } - else { - etiLog.level(error) << "Input read error."; - running = 0; - ret = run_modulator_state_t::normal_end; - } } } catch (const zmq_input_timeout&) { |